XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor.
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg) override
 
time_t GetExpiration () override
 Get a timestamp after which we give up.
 
const MessageGetRequest () const
 Get the request pointer.
 
virtual uint16_t GetSid () const override
 
virtual uint16_t InspectStatusRsp () override
 
virtual bool IsRaw () const override
 Are we a raw writer or not?
 
void OnReadyToSend (Message *msg) override
 
virtual void OnStatusReady (const Message *message, XRootDStatus status) override
 The requested action has been performed and the status is available.
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status) override
 
void PartialReceived ()
 
virtual void Process () override
 Process the message if it was "taken" by the examine action.
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead) override
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list.
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up.
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list.
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer.
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer.
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter.
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten) override
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive.
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138 :
139 pRequest( msg ),
140 pResponseHandler( respHandler ),
141 pUrl( *url ),
142 pEffectiveDataServerUrl( 0 ),
143 pSidMgr( sidMgr ),
144 pLFileHandler( lFileHandler ),
145 pExpiration( 0 ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
150 pChunkList( 0 ),
151 pKBuff( 0 ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
154
155 pAsyncOffset( 0 ),
156 pAsyncChunkIndex( 0 ),
157
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
161
162 pOtherRawStarted( false ),
163
164 pFollowMetalink( false ),
165
166 pStateful( false ),
167
168 pAggregatedWaitTime( 0 ),
169
170 pMsgInFly( false ),
171 pSendingState( 0 ),
172
173 pTimeoutFence( false ),
174
175 pDirListStarted( false ),
176 pDirListWithStat( false ),
177
178 pCV( 0 ),
179
180 pSslErrCnt( 0 )
181 {
182 pPostMaster = DefaultEnv::GetPostMaster();
183 if( msg->GetSessionId() )
184 pHasSessionId = true;
185
186 Log *log = DefaultEnv::GetLog();
187 log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
188 pUrl.GetHostId().c_str(), this,
189 pRequest->GetObfuscatedDescription().c_str() );
190
191 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
192 if( ntohs( hdr->requestid ) == kXR_pgread )
193 {
194 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
195 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
196 ntohl( pgrdreq->rlen ) ) );
197 }
198
199 if( ntohs( hdr->requestid ) == kXR_readv )
200 pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
201 else if( ntohs( hdr->requestid ) == kXR_read )
202 pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
203 else
204 pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
205 }
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 210 of file XrdClXRootDMsgHandler.hh.

211 {
212 DumpRedirectTraceBack();
213
214 if( !pHasSessionId )
215 delete pRequest;
216 delete pEffectiveDataServerUrl;
217
218 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
219 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
220 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
221 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
222 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
223 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
224
225 Log *log = DefaultEnv::GetLog();
226 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
227 pUrl.GetHostId().c_str(), this );
228 }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
overridevirtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110 {
111 const int sst = pSendingState.fetch_or( kSawResp );
112
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114 {
115 // we must have been sent although we haven't got the OnStatusReady
116 // notification yet. Set the inflight notice.
117
118 Log *log = DefaultEnv::GetLog();
119 log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
123
124 pMsgInFly = true;
125 }
126
127 //--------------------------------------------------------------------------
128 // if the MsgHandler is already being used to process another request
129 // (kXR_oksofar) we need to wait
130 //--------------------------------------------------------------------------
131 if( pOksofarAsAnswer )
132 {
133 XrdSysCondVarHelper lck( pCV );
134 while( pResponse ) pCV.Wait();
135 }
136 else
137 {
138 if( pResponse )
139 {
140 Log *log = DefaultEnv::GetLog();
141 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142 "it already owns a response: %p (message: %s ).",
143 pUrl.GetHostId().c_str(), this,
144 pRequest->GetObfuscatedDescription().c_str() );
145 }
146 }
147
148 if( msg->GetSize() < 8 )
149 return Ignore;
150
151 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153 uint16_t status = 0;
154 uint32_t dlen = 0;
155
156 //--------------------------------------------------------------------------
157 // We only care about async responses, but those are extracted now
158 // in the SocketHandler.
159 //--------------------------------------------------------------------------
160 if( rsp->hdr.status == kXR_attn )
161 {
162 return Ignore;
163 }
164 //--------------------------------------------------------------------------
165 // We got a sync message - check if it belongs to us
166 //--------------------------------------------------------------------------
167 else
168 {
169 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170 rsp->hdr.streamid[1] != req->header.streamid[1] )
171 return Ignore;
172
173 status = rsp->hdr.status;
174 dlen = rsp->hdr.dlen;
175 }
176
177 //--------------------------------------------------------------------------
178 // We take the ownership of the message and decide what we will do
179 // with the handler itself, the options are:
180 // 1) we want to either read in raw mode (the Raw flag) or have the message
181 // body reconstructed for us by the TransportHandler by the time
182 // Process() is called (default, no extra flag)
183 // 2) we either got a full response in which case we don't want to be
184 // notified about anything anymore (RemoveHandler) or we got a partial
185 // answer and we need to wait for more (default, no extra flag)
186 //--------------------------------------------------------------------------
187 pResponse = msg;
188 pBodyReader->SetDataLength( dlen );
189
190 Log *log = DefaultEnv::GetLog();
191 switch( status )
192 {
193 //------------------------------------------------------------------------
194 // Handle the cached cases
195 //------------------------------------------------------------------------
196 case kXR_error:
197 case kXR_redirect:
198 case kXR_wait:
199 return RemoveHandler;
200
201 case kXR_waitresp:
202 {
203 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204 "message %s", pUrl.GetHostId().c_str(),
205 pRequest->GetObfuscatedDescription().c_str() );
206
207 pResponse.reset();
208 return Ignore; // This must be handled synchronously!
209 }
210
211 //------------------------------------------------------------------------
212 // Handle the potential raw cases
213 //------------------------------------------------------------------------
214 case kXR_ok:
215 {
216 //----------------------------------------------------------------------
217 // For kXR_read we read in raw mode
218 //----------------------------------------------------------------------
219 uint16_t reqId = ntohs( req->header.requestid );
220 if( reqId == kXR_read )
221 {
222 return Raw | RemoveHandler;
223 }
224
225 //----------------------------------------------------------------------
226 // kXR_readv is the same as kXR_read
227 //----------------------------------------------------------------------
228 if( reqId == kXR_readv )
229 {
230 return Raw | RemoveHandler;
231 }
232
233 //----------------------------------------------------------------------
234 // For everything else we just take what we got
235 //----------------------------------------------------------------------
236 return RemoveHandler;
237 }
238
239 //------------------------------------------------------------------------
240 // kXR_oksofars are special, they are not full responses, so we reset
241 // the response pointer to 0 and add the message to the partial list
242 //------------------------------------------------------------------------
243 case kXR_oksofar:
244 {
245 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246 "%s", pUrl.GetHostId().c_str(),
247 pRequest->GetObfuscatedDescription().c_str() );
248
249 if( !pOksofarAsAnswer )
250 {
251 pPartialResps.emplace_back( std::move( pResponse ) );
252 }
253
254 //----------------------------------------------------------------------
255 // For kXR_read we either read in raw mode if the message has not
256 // been fully reconstructed already, if it has, we adjust
257 // the buffer offset to prepare for the next one
258 //----------------------------------------------------------------------
259 uint16_t reqId = ntohs( req->header.requestid );
260 if( reqId == kXR_read )
261 {
262 pTimeoutFence.store( true, std::memory_order_relaxed );
263 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264 }
265
266 //----------------------------------------------------------------------
267 // kXR_readv is similar to read, except that the payload is different
268 //----------------------------------------------------------------------
269 if( reqId == kXR_readv )
270 {
271 pTimeoutFence.store( true, std::memory_order_relaxed );
272 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273 }
274
275 return ( pOksofarAsAnswer ? None : NoProcess );
276 }
277
278 case kXR_status:
279 {
280 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281 "%s", pUrl.GetHostId().c_str(),
282 pRequest->GetObfuscatedDescription().c_str() );
283
284 uint16_t reqId = ntohs( req->header.requestid );
285 if( reqId == kXR_pgwrite )
286 {
287 //--------------------------------------------------------------------
288 // In case of pgwrite by definition this wont be a partial response
289 // so we can already remove the handler from the in-queue
290 //--------------------------------------------------------------------
291 return RemoveHandler;
292 }
293
294 //----------------------------------------------------------------------
295 // Otherwise (pgread), first of all we need to read the body of the
296 // kXR_status response, we can handle the raw data (if any) only after
297 // we have the whole kXR_status body
298 //----------------------------------------------------------------------
299 pTimeoutFence.store( true, std::memory_order_relaxed );
300 return None;
301 }
302
303 //------------------------------------------------------------------------
304 // Default
305 //------------------------------------------------------------------------
306 default:
307 return RemoveHandler;
308 }
309 return RemoveHandler;
310 }
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ClientRequestHdr header
Definition XProtocol.hh:846
@ kXR_pgwrite
Definition XProtocol.hh:138
ServerResponseHeader hdr
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlineoverridevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 331 of file XrdClXRootDMsgHandler.hh.

332 {
333 return pExpiration;
334 }

◆ GetRequest()

const Message * XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 357 of file XrdClXRootDMsgHandler.hh.

358 {
359 return pRequest;
360 }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
overridevirtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 405 of file XrdClXRootDMsgHandler.cc.

406 {
407 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
overridevirtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 315 of file XrdClXRootDMsgHandler.cc.

316 {
317 if( !pResponse )
318 return 0;
319
320 Log *log = DefaultEnv::GetLog();
321 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322
323 //--------------------------------------------------------------------------
324 // Additional action is only required for kXR_status
325 //--------------------------------------------------------------------------
326 if( rsp->hdr.status != kXR_status ) return 0;
327
328 //--------------------------------------------------------------------------
329 // Ignore malformed status response
330 //--------------------------------------------------------------------------
331 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332 {
333 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334 return Corrupted;
335 }
336
337 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338 uint16_t reqId = ntohs( req->header.requestid );
339 //--------------------------------------------------------------------------
340 // Unmarshal the status body
341 //--------------------------------------------------------------------------
342 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343
344 if( !st.IsOK() && st.code == errDataError )
345 {
346 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347 st.GetErrorMessage().c_str() );
348 return Corrupted;
349 }
350
351 if( !st.IsOK() )
352 {
353 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354 pUrl.GetHostId().c_str() );
355 pStatus = st;
356 HandleRspOrQueue();
357 return Ignore;
358 }
359
360 //--------------------------------------------------------------------------
361 // Common handling for partial results
362 //--------------------------------------------------------------------------
363 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365 {
366 pPartialResps.push_back( std::move( pResponse ) );
367 }
368
369 //--------------------------------------------------------------------------
370 // Decide the actions that we need to take
371 //--------------------------------------------------------------------------
372 uint16_t action = 0;
373 if( reqId == kXR_pgread )
374 {
375 //----------------------------------------------------------------------
376 // The message contains only Status header and body but no raw data
377 //----------------------------------------------------------------------
378 if( !pPageReader )
379 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380 pPageReader->SetRsp( rspst );
381
382 action |= Raw;
383
385 action |= NoProcess;
386 else
387 action |= RemoveHandler;
388 }
389 else if( reqId == kXR_pgwrite )
390 {
391 // if data corruption has been detected on the server side we will
392 // send some additional data pointing to the pages that need to be
393 // retransmitted
394 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395 pResponse->GetCursor() )
396 action |= More;
397 }
398
399 return action;
400 }
ServerResponseStatus status
struct ServerResponseBody_Status bdy
struct ServerResponseHeader hdr
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
@ kXR_PartialResult

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
overridevirtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 967 of file XrdClXRootDMsgHandler.cc.

968 {
969 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
970 uint16_t reqId = ntohs( req->header.requestid );
971 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
972 return true;
973 // checkpoint + execute
974 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
975 {
976 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
977 reqId = ntohs( xeq->header.requestid );
978 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
979 }
980
981 return false;
982 }
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_write
Definition XProtocol.hh:131
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_chkpoint
Definition XProtocol.hh:124
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnReadyToSend()

void XrdCl::XRootDMsgHandler::OnReadyToSend ( Message msg)
inlineoverridevirtual

Called just before the message is going to be sent through a valid connection, so that the user can still make some modifications that were impossible before (ie. protocol version dependent adjustments)

Parameters
msgmessage concerned

Reimplemented from XrdCl::MsgHandler.

Definition at line 433 of file XrdClXRootDMsgHandler.hh.

434 {
435 pSendingState = 0;
436 }

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
overridevirtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 919 of file XrdClXRootDMsgHandler.cc.

921 {
922 Log *log = DefaultEnv::GetLog();
923
924 const int sst = pSendingState.fetch_or( kSendDone );
925
926 if( sst & kFinalResp )
927 {
928 log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
929 "sent, already have final response, queuing handler callback.",
930 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
931 HandleRspOrQueue();
932 return;
933 }
934
935 if( sst & kSawResp )
936 {
937 log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
938 "been successfully sent.",
939 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
940 return;
941 }
942
943 //--------------------------------------------------------------------------
944 // We were successful, so we now need to listen for a response
945 //--------------------------------------------------------------------------
946 if( status.IsOK() )
947 {
948 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
949 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
950
951 pMsgInFly = true;
952 return;
953 }
954
955 //--------------------------------------------------------------------------
956 // We have failed, recover if possible
957 //--------------------------------------------------------------------------
958 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
959 "recover.", pUrl.GetHostId().c_str(),
960 message->GetObfuscatedDescription().c_str() );
961 HandleError( status );
962 }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
overridevirtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 882 of file XrdClXRootDMsgHandler.cc.

884 {
885 Log *log = DefaultEnv::GetLog();
886 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
887 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
888
889 if( event == Ready )
890 return 0;
891
892 if( pTimeoutFence.load( std::memory_order_relaxed ) )
893 return 0;
894
895 HandleError( status );
896 return RemoveHandler;
897 }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1144 of file XrdClXRootDMsgHandler.cc.

1145 {
1146 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1147 }

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::OnError(), and XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
overridevirtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 414 of file XrdClXRootDMsgHandler.cc.

415 {
416 Log *log = DefaultEnv::GetLog();
417
418 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419
420 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421
422 //--------------------------------------------------------------------------
423 // If it is a local file, it can be only a metalink redirector
424 //--------------------------------------------------------------------------
425 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426 pHosts->back().protocol = kXR_PROTOCOLVERSION;
427
428 //--------------------------------------------------------------------------
429 // We got an answer, check who we were talking to
430 //--------------------------------------------------------------------------
431 else
432 {
433 AnyObject qryResult;
434 int *qryResponse = nullptr;
435 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436 qryResult.Get( qryResponse );
437 if (qryResponse) {
438 pHosts->back().flags = *qryResponse;
439 delete qryResponse;
440 qryResponse = nullptr;
441 }
442 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
443 qryResult.Get( qryResponse );
444 if (qryResponse) {
445 pHosts->back().protocol = *qryResponse;
446 delete qryResponse;
447 }
448 }
449
450 //--------------------------------------------------------------------------
451 // Process the message
452 //--------------------------------------------------------------------------
453 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
454 if( !st.IsOK() )
455 {
456 pStatus = Status( stFatal, errInvalidMessage );
457 HandleResponse();
458 return;
459 }
460
461 //--------------------------------------------------------------------------
462 // we have an response for the message so it's not in fly anymore
463 //--------------------------------------------------------------------------
464 pMsgInFly = false;
465
466 //--------------------------------------------------------------------------
467 // Reset the aggregated wait (used to omit wait response in case of Metalink
468 // redirector)
469 //--------------------------------------------------------------------------
470 if( rsp->hdr.status != kXR_wait )
471 pAggregatedWaitTime = 0;
472
473 switch( rsp->hdr.status )
474 {
475 //------------------------------------------------------------------------
476 // kXR_ok - we're done here
477 //------------------------------------------------------------------------
478 case kXR_ok:
479 {
480 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
481 pUrl.GetHostId().c_str(),
482 pRequest->GetObfuscatedDescription().c_str() );
483 pStatus = Status();
484 HandleResponse();
485 return;
486 }
487
488 case kXR_status:
489 {
490 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
491 pUrl.GetHostId().c_str(),
492 pRequest->GetObfuscatedDescription().c_str() );
493 pStatus = Status();
494 HandleResponse();
495 return;
496 }
497
498 //------------------------------------------------------------------------
499 // kXR_ok - we're serving partial result to the user
500 //------------------------------------------------------------------------
501 case kXR_oksofar:
502 {
503 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
504 pUrl.GetHostId().c_str(),
505 pRequest->GetObfuscatedDescription().c_str() );
506 pStatus = Status( stOK, suContinue );
507 HandleResponse();
508 return;
509 }
510
511 //------------------------------------------------------------------------
512 // kXR_error - we've got a problem
513 //------------------------------------------------------------------------
514 case kXR_error:
515 {
516 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
517 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
518 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
519 "[%d] %s", pUrl.GetHostId().c_str(),
520 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
521 errmsg );
522 delete [] errmsg;
523
524 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
525 return;
526 }
527
528 //------------------------------------------------------------------------
529 // kXR_redirect - they tell us to go elsewhere
530 //------------------------------------------------------------------------
531 case kXR_redirect:
532 {
533 if( rsp->hdr.dlen <= 4 )
534 {
535 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
536 pUrl.GetHostId().c_str() );
537 pStatus = Status( stError, errInvalidResponse );
538 HandleResponse();
539 return;
540 }
541
542 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
543 urlInfoBuff[rsp->hdr.dlen-4] = 0;
544 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
545 std::string urlInfo = urlInfoBuff;
546 delete [] urlInfoBuff;
547 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
548 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
549 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
550 rsp->body.redirect.port );
551
552 //----------------------------------------------------------------------
553 // Check if we can proceed
554 //----------------------------------------------------------------------
555 if( !pRedirectCounter )
556 {
557 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
558 "message %s, the last known error is: %s",
559 pUrl.GetHostId().c_str(),
560 pRequest->GetObfuscatedDescription().c_str(),
561 pLastError.ToString().c_str() );
562
563
564 pStatus = Status( stFatal, errRedirectLimit );
565 HandleResponse();
566 return;
567 }
568 --pRedirectCounter;
569
570 //----------------------------------------------------------------------
571 // Keep the info about this server if we still need to find a load
572 // balancer
573 //----------------------------------------------------------------------
574 uint32_t flags = pHosts->back().flags;
575 if( !pHasLoadBalancer )
576 {
577 if( flags & kXR_isManager )
578 {
579 //------------------------------------------------------------------
580 // If the current server is a meta manager then it supersedes
581 // any existing load balancer, otherwise we assign a load-balancer
582 // only if it has not been already assigned
583 //------------------------------------------------------------------
584 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
585 {
586 pLoadBalancer = pHosts->back();
587 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
588 "as a load-balancer for message %s",
589 pUrl.GetHostId().c_str(),
590 pRequest->GetObfuscatedDescription().c_str() );
591 HostList::iterator it;
592 for( it = pHosts->begin(); it != pHosts->end(); ++it )
593 it->loadBalancer = false;
594 pHosts->back().loadBalancer = true;
595 }
596 }
597 }
598
599 //----------------------------------------------------------------------
600 // If the redirect comes from a data server safe the URL because
601 // in case of a failure we will use it as the effective data server URL
602 // for the tried CGI opaque info
603 //----------------------------------------------------------------------
604 if( flags & kXR_isServer )
605 pEffectiveDataServerUrl = new URL( pHosts->back().url );
606
607 //----------------------------------------------------------------------
608 // Build the URL and check it's validity
609 //----------------------------------------------------------------------
610 std::vector<std::string> urlComponents;
611 std::string newCgi;
612 Utils::splitString( urlComponents, urlInfo, "?" );
613
614 std::ostringstream o;
615
616 o << urlComponents[0];
617 if( rsp->body.redirect.port > 0 )
618 o << ":" << rsp->body.redirect.port << "/";
619 else if( rsp->body.redirect.port < 0 )
620 {
621 //--------------------------------------------------------------------
622 // check if the manager wants to enforce write recovery at himself
623 // (beware we are dealing here with negative flags)
624 //--------------------------------------------------------------------
625 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
626 pHosts->back().flags |= kXR_recoverWrts;
627
628 //--------------------------------------------------------------------
629 // check if the manager wants to collapse the communication channel
630 // (the redirect host is to replace the current host)
631 //--------------------------------------------------------------------
632 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
633 {
634 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
635 pPostMaster->CollapseRedirect( pUrl, url );
636 }
637
638 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
639 {
640 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
641 if( Utils::CheckEC( pRequest, url ) )
642 pRedirectAsAnswer = true;
643 }
644 }
645
646 URL newUrl = URL( o.str() );
647 if( !newUrl.IsValid() )
648 {
649 pStatus = Status( stError, errInvalidRedirectURL );
650 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
651 pUrl.GetHostId().c_str(), urlInfo.c_str() );
652 HandleResponse();
653 return;
654 }
655
656 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
657 newUrl.SetUserName( pUrl.GetUserName() );
658
659 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
660 newUrl.SetPassword( pUrl.GetPassword() );
661
662 //----------------------------------------------------------------------
663 // Forward any "xrd.*" params from the original client request also to
664 // the new redirection url
665 // Also, we need to preserve any "xrdcl.*' as they are important for
666 // our internal workflows.
667 //----------------------------------------------------------------------
668 std::ostringstream ossXrd;
669 const URL::ParamsMap &urlParams = pUrl.GetParams();
670
671 for(URL::ParamsMap::const_iterator it = urlParams.begin();
672 it != urlParams.end(); ++it )
673 {
674 if( it->first.compare( 0, 4, "xrd." ) &&
675 it->first.compare( 0, 6, "xrdcl." ) )
676 continue;
677
678 ossXrd << it->first << '=' << it->second << '&';
679 }
680
681 std::string xrdCgi = ossXrd.str();
682 pRedirectUrl = newUrl.GetURL();
683
684 URL cgiURL;
685 if( urlComponents.size() > 1 )
686 {
687 pRedirectUrl += "?";
688 pRedirectUrl += urlComponents[1];
689 std::ostringstream o;
690 o << "fake://fake:111//fake?";
691 o << urlComponents[1];
692
693 if( urlComponents.size() == 3 )
694 o << '?' << urlComponents[2];
695
696 if (!xrdCgi.empty())
697 {
698 o << '&' << xrdCgi;
699 pRedirectUrl += '&';
700 pRedirectUrl += xrdCgi;
701 }
702
703 cgiURL = URL( o.str() );
704 }
705 else {
706 if (!xrdCgi.empty())
707 {
708 std::ostringstream o;
709 o << "fake://fake:111//fake?";
710 o << xrdCgi;
711 cgiURL = URL( o.str() );
712 pRedirectUrl += '?';
713 pRedirectUrl += xrdCgi;
714 }
715 }
716
717 //----------------------------------------------------------------------
718 // Check if we need to return the URL as a response
719 //----------------------------------------------------------------------
720 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
721 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
722 !newUrl.IsLocalFile() )
723 pRedirectAsAnswer = true;
724
725 if( pRedirectAsAnswer )
726 {
727 pStatus = Status( stError, errRedirect );
728 HandleResponse();
729 return;
730 }
731
732 //----------------------------------------------------------------------
733 // Rewrite the message in a way required to send it to another server
734 //----------------------------------------------------------------------
735 newUrl.SetParams( cgiURL.GetParams() );
736 Status st = RewriteRequestRedirect( newUrl );
737 if( !st.IsOK() )
738 {
739 pStatus = st;
740 HandleResponse();
741 return;
742 }
743
744 //----------------------------------------------------------------------
745 // Make sure we don't change the protocol by accident (root vs roots)
746 //----------------------------------------------------------------------
747 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
748 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
749 newUrl.SetProtocol( "roots" );
750
751 //----------------------------------------------------------------------
752 // Send the request to the new location
753 //----------------------------------------------------------------------
754 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
755 return;
756 }
757
758 //------------------------------------------------------------------------
759 // kXR_wait - we wait, and re-issue the request later
760 //------------------------------------------------------------------------
761 case kXR_wait:
762 {
763 uint32_t waitSeconds = 0;
764
765 if( rsp->hdr.dlen >= 4 )
766 {
767 char *infoMsg = new char[rsp->hdr.dlen-3];
768 infoMsg[rsp->hdr.dlen-4] = 0;
769 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
770 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
771 "message %s: %s", pUrl.GetHostId().c_str(),
772 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
773 infoMsg );
774 delete [] infoMsg;
775 waitSeconds = rsp->body.wait.seconds;
776 }
777 else
778 {
779 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
780 "message %s", pUrl.GetHostId().c_str(),
781 pRequest->GetObfuscatedDescription().c_str() );
782 }
783
784 pAggregatedWaitTime += waitSeconds;
785
786 // We need a special case if the data node comes from metalink
787 // redirector. In this case it might make more sense to try the
788 // next entry in the Metalink than wait.
789 if( OmitWait( *pRequest, pLoadBalancer.url ) )
790 {
791 int maxWait = DefaultMaxMetalinkWait;
792 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
793 if( pAggregatedWaitTime > maxWait )
794 {
795 UpdateTriedCGI();
796 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
797 return;
798 }
799 }
800
801 //----------------------------------------------------------------------
802 // Some messages require rewriting before they can be sent again
803 // after wait
804 //----------------------------------------------------------------------
805 Status st = RewriteRequestWait();
806 if( !st.IsOK() )
807 {
808 pStatus = st;
809 HandleResponse();
810 return;
811 }
812
813 //----------------------------------------------------------------------
814 // Register a task to resend the message in some seconds, if we still
815 // have time to do that, and report a timeout otherwise
816 //----------------------------------------------------------------------
817 time_t resendTime = ::time(0)+waitSeconds;
818
819 if( resendTime < pExpiration )
820 {
821 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
822 pUrl.GetHostId().c_str(), this,
823 pRequest->GetObfuscatedDescription().c_str() );
824
825 TaskManager *taskMgr = pPostMaster->GetTaskManager();
826 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
827 }
828 else
829 {
830 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
831 pUrl.GetHostId().c_str(),
832 pRequest->GetObfuscatedDescription().c_str() );
833 HandleError( Status( stError, errOperationExpired) );
834 }
835 return;
836 }
837
838 //------------------------------------------------------------------------
839 // kXR_waitresp - the response will be returned in some seconds as an
840 // unsolicited message. Currently all messages of this type are handled
841 // one step before in the XrdClStream::OnIncoming as they need to be
842 // processed synchronously.
843 //------------------------------------------------------------------------
844 case kXR_waitresp:
845 {
846 if( rsp->hdr.dlen < 4 )
847 {
848 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
849 pUrl.GetHostId().c_str() );
850 pStatus = Status( stError, errInvalidResponse );
851 HandleResponse();
852 return;
853 }
854
855 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
856 "message %s", pUrl.GetHostId().c_str(),
857 rsp->body.waitresp.seconds,
858 pRequest->GetObfuscatedDescription().c_str() );
859 return;
860 }
861
862 //------------------------------------------------------------------------
863 // Default - unrecognized/unsupported response, declare an error
864 //------------------------------------------------------------------------
865 default:
866 {
867 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
868 "message %s", pUrl.GetHostId().c_str(),
869 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
870 pStatus = Status( stError, errInvalidResponse );
871 HandleResponse();
872 return;
873 }
874 }
875
876 return;
877 }
#define kXR_isManager
union ServerResponse::@0 body
#define kXR_collapseRedir
#define kXR_attrMeta
#define kXR_recoverWrts
#define kXR_isServer
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
#define kXR_ecRedir
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
bool IsLocalFile() const
Definition XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t suContinue
const uint16_t errRedirect
const uint16_t errInvalidMessage
URL url
URL of the host.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
overridevirtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 902 of file XrdClXRootDMsgHandler.cc.

905 {
906 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
907 uint16_t reqId = ntohs( req->header.requestid );
908
909 if( reqId == kXR_pgread )
910 return pPageReader->Read( *socket, bytesRead );
911
912 return pBodyReader->Read( *socket, bytesRead );
913 }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 384 of file XrdClXRootDMsgHandler.hh.

385 {
386 pChunkList = chunkList;
387 if( pBodyReader )
388 pBodyReader->SetChunkList( chunkList );
389 if( chunkList )
390 pChunkStatus.resize( chunkList->size() );
391 else
392 pChunkStatus.clear();
393 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 395 of file XrdClXRootDMsgHandler.hh.

396 {
397 pCrc32cDigests = std::move( crc32cDigests );
398 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 323 of file XrdClXRootDMsgHandler.hh.

324 {
325 pExpiration = expiration;
326 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 416 of file XrdClXRootDMsgHandler.hh.

417 {
418 pFollowMetalink = followMetalink;
419 }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 376 of file XrdClXRootDMsgHandler.hh.

377 {
378 pHosts.reset( hostList );
379 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 403 of file XrdClXRootDMsgHandler.hh.

404 {
405 pKBuff = kbuff;
406 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 365 of file XrdClXRootDMsgHandler.hh.

366 {
367 if( !loadBalancer.url.IsValid() )
368 return;
369 pLoadBalancer = loadBalancer;
370 pHasLoadBalancer = true;
371 }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 349 of file XrdClXRootDMsgHandler.hh.

350 {
351 pOksofarAsAnswer = oksofarAsAnswer;
352 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 340 of file XrdClXRootDMsgHandler.hh.

341 {
342 pRedirectAsAnswer = redirectAsAnswer;
343 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 411 of file XrdClXRootDMsgHandler.hh.

412 {
413 pRedirectCounter = redirectCounter;
414 }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 421 of file XrdClXRootDMsgHandler.hh.

422 {
423 pStateful = stateful;
424 }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1136 of file XrdClXRootDMsgHandler.cc.

1137 {
1138 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1139 }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
overridevirtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 987 of file XrdClXRootDMsgHandler.cc.

989 {
990 //--------------------------------------------------------------------------
991 // First check if it is a PgWrite
992 //--------------------------------------------------------------------------
993 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
994 {
995 //------------------------------------------------------------------------
996 // PgWrite will have just one chunk
997 //------------------------------------------------------------------------
998 ChunkInfo chunk = pChunkList->front();
999 //------------------------------------------------------------------------
1000 // Calculate the size of the first and last page (in case the chunk is not
1001 // 4KB aligned)
1002 //------------------------------------------------------------------------
1003 int fLen = 0, lLen = 0;
1004 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1005
1006 //------------------------------------------------------------------------
1007 // Set the crc32c buffer if not ready yet
1008 //------------------------------------------------------------------------
1009 if( pPgWrtCksumBuff.GetCursor() == 0 )
1010 {
1011 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1012 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1013 }
1014
1015 uint32_t btsLeft = chunk.length - pAsyncOffset;
1016 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1017 if( pglen > btsLeft ) pglen = btsLeft;
1018 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1019
1020 while( btsLeft > 0 )
1021 {
1022 // first write the crc32c digest
1023 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1024 {
1025 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1026 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1027 int btswrt = 0;
1028 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1029 if( !st.IsOK() ) return st;
1030 bytesWritten += btswrt;
1031 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1032 if( st.code == suRetry ) return st;
1033 }
1034 // then write the raw data (one page)
1035 int btswrt = 0;
1036 Status st = socket->Send( pgbuf, pglen, btswrt );
1037 if( !st.IsOK() ) return st;
1038 pgbuf += btswrt;
1039 pglen -= btswrt;
1040 btsLeft -= btswrt;
1041 bytesWritten += btswrt;
1042 pAsyncOffset += btswrt; // update the offset to the raw data
1043 if( st.code == suRetry ) return st;
1044 // if we managed to write all the data ...
1045 if( pglen == 0 )
1046 {
1047 // move to the next page
1048 ++pPgWrtCurrentPageNb;
1049 if( pPgWrtCurrentPageNb < nbpgs )
1050 {
1051 // set the digest buffer
1052 pPgWrtCksumBuff.SetCursor( 0 );
1053 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1054 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1055 }
1056 // set the page length
1057 pglen = XrdSys::PageSize;
1058 if( pglen > btsLeft ) pglen = btsLeft;
1059 // reset offset in the current page
1060 pPgWrtCurrentPageOffset = 0;
1061 }
1062 else
1063 // otherwise just adjust the offset in the current page
1064 pPgWrtCurrentPageOffset += btswrt;
1065
1066 }
1067 }
1068 else if( !pChunkList->empty() )
1069 {
1070 size_t size = pChunkList->size();
1071 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1072 {
1073 char *buffer = (char*)(*pChunkList)[i].buffer;
1074 uint32_t size = (*pChunkList)[i].length;
1075 size_t leftToBeWritten = size - pAsyncOffset;
1076
1077 while( leftToBeWritten )
1078 {
1079 int btswrt = 0;
1080 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1081 bytesWritten += btswrt;
1082 if( !st.IsOK() || st.code == suRetry ) return st;
1083 pAsyncOffset += btswrt;
1084 leftToBeWritten -= btswrt;
1085 }
1086 //----------------------------------------------------------------------
1087 // Remember that we have moved to the next chunk, also clear the offset
1088 // within the buffer as we are going to move to a new one
1089 //----------------------------------------------------------------------
1090 ++pAsyncChunkIndex;
1091 pAsyncOffset = 0;
1092 }
1093 }
1094 else
1095 {
1096 Log *log = DefaultEnv::GetLog();
1097
1098 //------------------------------------------------------------------------
1099 // If the socket is encrypted we cannot use a kernel buffer, we have to
1100 // convert to user space buffer
1101 //------------------------------------------------------------------------
1102 if( socket->IsEncrypted() )
1103 {
1104 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1105 pUrl.GetHostId().c_str() );
1106
1107 char *ubuff = 0;
1108 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1109 if( ret < 0 ) return Status( stError, errInternal );
1110 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1111 return WriteMessageBody( socket, bytesWritten );
1112 }
1113
1114 //------------------------------------------------------------------------
1115 // Send the data
1116 //------------------------------------------------------------------------
1117 while( !pKBuff->Empty() )
1118 {
1119 int btswrt = 0;
1120 Status st = socket->Send( *pKBuff, btswrt );
1121 bytesWritten += btswrt;
1122 if( !st.IsOK() || st.code == suRetry ) return st;
1123 }
1124
1125 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1126 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1127 }
1128
1129 return Status();
1130 }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
const uint16_t suRetry
const uint16_t errInternal
Internal error.
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, WriteMessageBody(), and XrdCl::XRootDMsg.

Referenced by WriteMessageBody().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: