XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
31#include "XrdCl/XrdClURL.hh"
32#include "XrdCl/XrdClUtils.hh"
39#include "XrdCl/XrdClSocket.hh"
40#include "XrdCl/XrdClTls.hh"
41
42#include "XrdOuc/XrdOucCRC.hh"
43
44#include "XrdSys/XrdSysPlatform.hh" // same as above
47#include <memory>
48#include <sstream>
49#include <numeric>
50
51namespace
52{
53 //----------------------------------------------------------------------------
54 // We need an extra task what will run the handler in the future, because
55 // tasks get deleted and we need the handler
56 //----------------------------------------------------------------------------
57 class WaitTask: public XrdCl::Task
58 {
59 public:
60 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
61 {
62 std::ostringstream o;
63 o << "WaitTask for: 0x" << handler->GetRequest();
64 SetName( o.str() );
65 }
66
67 virtual time_t Run( time_t now )
68 {
69 pHandler->WaitDone( now );
70 return 0;
71 }
72 private:
74 };
75}
76
77namespace XrdCl
78{
79 //----------------------------------------------------------------------------
80 // Delegate the response handling to the thread-pool
81 //----------------------------------------------------------------------------
83 {
84 public:
85 HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
86 {
87
88 }
89
90 virtual ~HandleRspJob()
91 {
92
93 }
94
95 virtual void Run( void *arg )
96 {
97 pHandler->HandleResponse();
98 delete this;
99 }
100 private:
101 XrdCl::XRootDMsgHandler *pHandler;
102 };
103
104 //----------------------------------------------------------------------------
105 // Examine an incoming message, and decide on the action to be taken
106 //----------------------------------------------------------------------------
107 uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
108 {
109 //--------------------------------------------------------------------------
110 // if the MsgHandler is already being used to process another request
111 // (kXR_oksofar) we need to wait
112 //--------------------------------------------------------------------------
113 if( pOksofarAsAnswer )
114 {
115 XrdSysCondVarHelper lck( pCV );
116 while( pResponse ) pCV.Wait();
117 }
118 else
119 {
120 if( pResponse )
121 {
122 Log *log = DefaultEnv::GetLog();
123 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
124 "it already owns a response: 0x%x (message: %s ).",
125 pUrl.GetHostId().c_str(), this,
126 pRequest->GetDescription().c_str() );
127 }
128 }
129
130 if( msg->GetSize() < 8 )
131 return Ignore;
132
133 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
134 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
135 uint16_t status = 0;
136 uint32_t dlen = 0;
137
138 //--------------------------------------------------------------------------
139 // We only care about async responses, but those are extracted now
140 // in the SocketHandler.
141 //--------------------------------------------------------------------------
142 if( rsp->hdr.status == kXR_attn )
143 {
144 return Ignore;
145 }
146 //--------------------------------------------------------------------------
147 // We got a sync message - check if it belongs to us
148 //--------------------------------------------------------------------------
149 else
150 {
151 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
152 rsp->hdr.streamid[1] != req->header.streamid[1] )
153 return Ignore;
154
155 status = rsp->hdr.status;
156 dlen = rsp->hdr.dlen;
157 }
158
159 //--------------------------------------------------------------------------
160 // We take the ownership of the message and decide what we will do
161 // with the handler itself, the options are:
162 // 1) we want to either read in raw mode (the Raw flag) or have the message
163 // body reconstructed for us by the TransportHandler by the time
164 // Process() is called (default, no extra flag)
165 // 2) we either got a full response in which case we don't want to be
166 // notified about anything anymore (RemoveHandler) or we got a partial
167 // answer and we need to wait for more (default, no extra flag)
168 //--------------------------------------------------------------------------
169 pResponse = msg;
170 pBodyReader->SetDataLength( dlen );
171
172 Log *log = DefaultEnv::GetLog();
173 switch( status )
174 {
175 //------------------------------------------------------------------------
176 // Handle the cached cases
177 //------------------------------------------------------------------------
178 case kXR_error:
179 case kXR_redirect:
180 case kXR_wait:
181 return RemoveHandler;
182
183 case kXR_waitresp:
184 {
185 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
186 "message %s", pUrl.GetHostId().c_str(),
187 pRequest->GetDescription().c_str() );
188
189 pResponse.reset();
190 return Ignore; // This must be handled synchronously!
191 }
192
193 //------------------------------------------------------------------------
194 // Handle the potential raw cases
195 //------------------------------------------------------------------------
196 case kXR_ok:
197 {
198 //----------------------------------------------------------------------
199 // For kXR_read we read in raw mode
200 //----------------------------------------------------------------------
201 uint16_t reqId = ntohs( req->header.requestid );
202 if( reqId == kXR_read )
203 {
204 return Raw | RemoveHandler;
205 }
206
207 //----------------------------------------------------------------------
208 // kXR_readv is the same as kXR_read
209 //----------------------------------------------------------------------
210 if( reqId == kXR_readv )
211 {
212 return Raw | RemoveHandler;
213 }
214
215 //----------------------------------------------------------------------
216 // For everything else we just take what we got
217 //----------------------------------------------------------------------
218 return RemoveHandler;
219 }
220
221 //------------------------------------------------------------------------
222 // kXR_oksofars are special, they are not full responses, so we reset
223 // the response pointer to 0 and add the message to the partial list
224 //------------------------------------------------------------------------
225 case kXR_oksofar:
226 {
227 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
228 "%s", pUrl.GetHostId().c_str(),
229 pRequest->GetDescription().c_str() );
230
231 if( !pOksofarAsAnswer )
232 {
233 pPartialResps.emplace_back( std::move( pResponse ) );
234 }
235
236 //----------------------------------------------------------------------
237 // For kXR_read we either read in raw mode if the message has not
238 // been fully reconstructed already, if it has, we adjust
239 // the buffer offset to prepare for the next one
240 //----------------------------------------------------------------------
241 uint16_t reqId = ntohs( req->header.requestid );
242 if( reqId == kXR_read )
243 {
244 pTimeoutFence.store( true, std::memory_order_relaxed );
245 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
246 }
247
248 //----------------------------------------------------------------------
249 // kXR_readv is similar to read, except that the payload is different
250 //----------------------------------------------------------------------
251 if( reqId == kXR_readv )
252 {
253 pTimeoutFence.store( true, std::memory_order_relaxed );
254 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
255 }
256
257 return ( pOksofarAsAnswer ? None : NoProcess );
258 }
259
260 case kXR_status:
261 {
262 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
263 "%s", pUrl.GetHostId().c_str(),
264 pRequest->GetDescription().c_str() );
265
266 uint16_t reqId = ntohs( req->header.requestid );
267 if( reqId == kXR_pgwrite )
268 {
269 //--------------------------------------------------------------------
270 // In case of pgwrite by definition this wont be a partial response
271 // so we can already remove the handler from the in-queue
272 //--------------------------------------------------------------------
273 return RemoveHandler;
274 }
275
276 //----------------------------------------------------------------------
277 // Otherwise (pgread), first of all we need to read the body of the
278 // kXR_status response, we can handle the raw data (if any) only after
279 // we have the whole kXR_status body
280 //----------------------------------------------------------------------
281 pTimeoutFence.store( true, std::memory_order_relaxed );
282 return None;
283 }
284
285 //------------------------------------------------------------------------
286 // Default
287 //------------------------------------------------------------------------
288 default:
289 return RemoveHandler;
290 }
291 return RemoveHandler;
292 }
293
294 //----------------------------------------------------------------------------
295 // Reexamine the incoming message, and decide on the action to be taken
296 //----------------------------------------------------------------------------
298 {
299 if( !pResponse )
300 return 0;
301
302 Log *log = DefaultEnv::GetLog();
303 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
304
305 //--------------------------------------------------------------------------
306 // Additional action is only required for kXR_status
307 //--------------------------------------------------------------------------
308 if( rsp->hdr.status != kXR_status ) return 0;
309
310 //--------------------------------------------------------------------------
311 // Ignore malformed status response
312 //--------------------------------------------------------------------------
313 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
314 {
315 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
316 return Corrupted;
317 }
318
319 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
320 uint16_t reqId = ntohs( req->header.requestid );
321 //--------------------------------------------------------------------------
322 // Unmarshal the status body
323 //--------------------------------------------------------------------------
324 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
325
326 if( !st.IsOK() && st.code == errDataError )
327 {
328 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
329 st.GetErrorMessage().c_str() );
330 return Corrupted;
331 }
332
333 if( !st.IsOK() )
334 {
335 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
336 pUrl.GetHostId().c_str() );
337 pStatus = st;
338 HandleRspOrQueue();
339 return Ignore;
340 }
341
342 //--------------------------------------------------------------------------
343 // Common handling for partial results
344 //--------------------------------------------------------------------------
345 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
347 {
348 pPartialResps.push_back( std::move( pResponse ) );
349 }
350
351 //--------------------------------------------------------------------------
352 // Decide the actions that we need to take
353 //--------------------------------------------------------------------------
354 uint16_t action = 0;
355 if( reqId == kXR_pgread )
356 {
357 //----------------------------------------------------------------------
358 // The message contains only Status header and body but no raw data
359 //----------------------------------------------------------------------
360 if( !pPageReader )
361 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
362 pPageReader->SetRsp( rspst );
363
364 action |= Raw;
365
367 action |= NoProcess;
368 else
369 action |= RemoveHandler;
370 }
371 else if( reqId == kXR_pgwrite )
372 {
373 // if data corruption has been detected on the server side we will
374 // send some additional data pointing to the pages that need to be
375 // retransmitted
376 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
377 pResponse->GetCursor() )
378 action |= More;
379 }
380
381 return action;
382 }
383
384 //----------------------------------------------------------------------------
385 // Get handler sid
386 //----------------------------------------------------------------------------
388 {
389 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
390 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
391 }
392
393 //----------------------------------------------------------------------------
395 //----------------------------------------------------------------------------
397 {
398 Log *log = DefaultEnv::GetLog();
399
400 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
401
402 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
403
404 //--------------------------------------------------------------------------
405 // If it is a local file, it can be only a metalink redirector
406 //--------------------------------------------------------------------------
407 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
408 pHosts->back().protocol = kXR_PROTOCOLVERSION;
409
410 //--------------------------------------------------------------------------
411 // We got an answer, check who we were talking to
412 //--------------------------------------------------------------------------
413 else
414 {
415 AnyObject qryResult;
416 int *qryResponse = 0;
417 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
418 qryResult.Get( qryResponse );
419 pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
420 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
421 qryResult.Get( qryResponse );
422 pHosts->back().protocol = *qryResponse; delete qryResponse;
423 }
424
425 //--------------------------------------------------------------------------
426 // Process the message
427 //--------------------------------------------------------------------------
428 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
429 if( !st.IsOK() )
430 {
431 pStatus = Status( stFatal, errInvalidMessage );
432 HandleResponse();
433 return;
434 }
435
436 //--------------------------------------------------------------------------
437 // we have an response for the message so it's not in fly anymore
438 //--------------------------------------------------------------------------
439 pMsgInFly = false;
440
441 //--------------------------------------------------------------------------
442 // Reset the aggregated wait (used to omit wait response in case of Metalink
443 // redirector)
444 //--------------------------------------------------------------------------
445 if( rsp->hdr.status != kXR_wait )
446 pAggregatedWaitTime = 0;
447
448 switch( rsp->hdr.status )
449 {
450 //------------------------------------------------------------------------
451 // kXR_ok - we're done here
452 //------------------------------------------------------------------------
453 case kXR_ok:
454 {
455 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
456 pUrl.GetHostId().c_str(),
457 pRequest->GetDescription().c_str() );
458 pStatus = Status();
459 HandleResponse();
460 return;
461 }
462
463 case kXR_status:
464 {
465 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
466 pUrl.GetHostId().c_str(),
467 pRequest->GetDescription().c_str() );
468 pStatus = Status();
469 HandleResponse();
470 return;
471 }
472
473 //------------------------------------------------------------------------
474 // kXR_ok - we're serving partial result to the user
475 //------------------------------------------------------------------------
476 case kXR_oksofar:
477 {
478 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
479 pUrl.GetHostId().c_str(),
480 pRequest->GetDescription().c_str() );
481 pStatus = Status( stOK, suContinue );
482 HandleResponse();
483 return;
484 }
485
486 //------------------------------------------------------------------------
487 // kXR_error - we've got a problem
488 //------------------------------------------------------------------------
489 case kXR_error:
490 {
491 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
492 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
493 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
494 "[%d] %s", pUrl.GetHostId().c_str(),
495 pRequest->GetDescription().c_str(), rsp->body.error.errnum,
496 errmsg );
497 delete [] errmsg;
498
499 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
500 return;
501 }
502
503 //------------------------------------------------------------------------
504 // kXR_redirect - they tell us to go elsewhere
505 //------------------------------------------------------------------------
506 case kXR_redirect:
507 {
508 if( rsp->hdr.dlen <= 4 )
509 {
510 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
511 pUrl.GetHostId().c_str() );
512 pStatus = Status( stError, errInvalidResponse );
513 HandleResponse();
514 return;
515 }
516
517 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
518 urlInfoBuff[rsp->hdr.dlen-4] = 0;
519 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
520 std::string urlInfo = urlInfoBuff;
521 delete [] urlInfoBuff;
522 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
523 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
524 pRequest->GetDescription().c_str(), urlInfo.c_str(),
525 rsp->body.redirect.port );
526
527 //----------------------------------------------------------------------
528 // Check if we can proceed
529 //----------------------------------------------------------------------
530 if( !pRedirectCounter )
531 {
532 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
533 "message %s, the last known error is: %s",
534 pUrl.GetHostId().c_str(),
535 pRequest->GetDescription().c_str(),
536 pLastError.ToString().c_str() );
537
538
539 pStatus = Status( stFatal, errRedirectLimit );
540 HandleResponse();
541 return;
542 }
543 --pRedirectCounter;
544
545 //----------------------------------------------------------------------
546 // Keep the info about this server if we still need to find a load
547 // balancer
548 //----------------------------------------------------------------------
549 uint32_t flags = pHosts->back().flags;
550 if( !pHasLoadBalancer )
551 {
552 if( flags & kXR_isManager )
553 {
554 //------------------------------------------------------------------
555 // If the current server is a meta manager then it supersedes
556 // any existing load balancer, otherwise we assign a load-balancer
557 // only if it has not been already assigned
558 //------------------------------------------------------------------
559 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
560 {
561 pLoadBalancer = pHosts->back();
562 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
563 "as a load-balancer for message %s",
564 pUrl.GetHostId().c_str(),
565 pRequest->GetDescription().c_str() );
566 HostList::iterator it;
567 for( it = pHosts->begin(); it != pHosts->end(); ++it )
568 it->loadBalancer = false;
569 pHosts->back().loadBalancer = true;
570 }
571 }
572 }
573
574 //----------------------------------------------------------------------
575 // If the redirect comes from a data server safe the URL because
576 // in case of a failure we will use it as the effective data server URL
577 // for the tried CGI opaque info
578 //----------------------------------------------------------------------
579 if( flags & kXR_isServer )
580 pEffectiveDataServerUrl = new URL( pHosts->back().url );
581
582 //----------------------------------------------------------------------
583 // Build the URL and check it's validity
584 //----------------------------------------------------------------------
585 std::vector<std::string> urlComponents;
586 std::string newCgi;
587 Utils::splitString( urlComponents, urlInfo, "?" );
588
589 std::ostringstream o;
590
591 o << urlComponents[0];
592 if( rsp->body.redirect.port > 0 )
593 o << ":" << rsp->body.redirect.port << "/";
594 else if( rsp->body.redirect.port < 0 )
595 {
596 //--------------------------------------------------------------------
597 // check if the manager wants to enforce write recovery at himself
598 // (beware we are dealing here with negative flags)
599 //--------------------------------------------------------------------
600 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
601 pHosts->back().flags |= kXR_recoverWrts;
602
603 //--------------------------------------------------------------------
604 // check if the manager wants to collapse the communication channel
605 // (the redirect host is to replace the current host)
606 //--------------------------------------------------------------------
607 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
608 {
609 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
610 pPostMaster->CollapseRedirect( pUrl, url );
611 }
612
613 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
614 {
615 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
616 if( Utils::CheckEC( pRequest, url ) )
617 pRedirectAsAnswer = true;
618 }
619 }
620
621 URL newUrl = URL( o.str() );
622 if( !newUrl.IsValid() )
623 {
625 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
626 pUrl.GetHostId().c_str(), urlInfo.c_str() );
627 HandleResponse();
628 return;
629 }
630
631 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
632 newUrl.SetUserName( pUrl.GetUserName() );
633
634 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
635 newUrl.SetPassword( pUrl.GetPassword() );
636
637 //----------------------------------------------------------------------
638 // Forward any "xrd.*" params from the original client request also to
639 // the new redirection url
640 // Also, we need to preserve any "xrdcl.*' as they are important for
641 // our internal workflows.
642 //----------------------------------------------------------------------
643 std::ostringstream ossXrd;
644 const URL::ParamsMap &urlParams = pUrl.GetParams();
645
646 for(URL::ParamsMap::const_iterator it = urlParams.begin();
647 it != urlParams.end(); ++it )
648 {
649 if( it->first.compare( 0, 4, "xrd." ) &&
650 it->first.compare( 0, 6, "xrdcl." ) )
651 continue;
652
653 ossXrd << it->first << '=' << it->second << '&';
654 }
655
656 std::string xrdCgi = ossXrd.str();
657 pRedirectUrl = newUrl.GetURL();
658
659 URL cgiURL;
660 if( urlComponents.size() > 1 )
661 {
662 pRedirectUrl += "?";
663 pRedirectUrl += urlComponents[1];
664 std::ostringstream o;
665 o << "fake://fake:111//fake?";
666 o << urlComponents[1];
667
668 if( urlComponents.size() == 3 )
669 o << '?' << urlComponents[2];
670
671 if (!xrdCgi.empty())
672 {
673 o << '&' << xrdCgi;
674 pRedirectUrl += '&';
675 pRedirectUrl += xrdCgi;
676 }
677
678 cgiURL = URL( o.str() );
679 }
680 else {
681 if (!xrdCgi.empty())
682 {
683 std::ostringstream o;
684 o << "fake://fake:111//fake?";
685 o << xrdCgi;
686 cgiURL = URL( o.str() );
687 pRedirectUrl += '?';
688 pRedirectUrl += xrdCgi;
689 }
690 }
691
692 //----------------------------------------------------------------------
693 // Check if we need to return the URL as a response
694 //----------------------------------------------------------------------
695 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
696 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
697 !newUrl.IsLocalFile() )
698 pRedirectAsAnswer = true;
699
700 if( pRedirectAsAnswer )
701 {
702 pStatus = Status( stError, errRedirect );
703 HandleResponse();
704 return;
705 }
706
707 //----------------------------------------------------------------------
708 // Rewrite the message in a way required to send it to another server
709 //----------------------------------------------------------------------
710 newUrl.SetParams( cgiURL.GetParams() );
711 Status st = RewriteRequestRedirect( newUrl );
712 if( !st.IsOK() )
713 {
714 pStatus = st;
715 HandleResponse();
716 return;
717 }
718
719 //----------------------------------------------------------------------
720 // Make sure we don't change the protocol by accident (root vs roots)
721 //----------------------------------------------------------------------
722 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
723 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
724 newUrl.SetProtocol( "roots" );
725
726 //----------------------------------------------------------------------
727 // Send the request to the new location
728 //----------------------------------------------------------------------
729 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
730 return;
731 }
732
733 //------------------------------------------------------------------------
734 // kXR_wait - we wait, and re-issue the request later
735 //------------------------------------------------------------------------
736 case kXR_wait:
737 {
738 uint32_t waitSeconds = 0;
739
740 if( rsp->hdr.dlen >= 4 )
741 {
742 char *infoMsg = new char[rsp->hdr.dlen-3];
743 infoMsg[rsp->hdr.dlen-4] = 0;
744 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
745 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
746 "message %s: %s", pUrl.GetHostId().c_str(),
747 rsp->body.wait.seconds, pRequest->GetDescription().c_str(),
748 infoMsg );
749 delete [] infoMsg;
750 waitSeconds = rsp->body.wait.seconds;
751 }
752 else
753 {
754 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
755 "message %s", pUrl.GetHostId().c_str(),
756 pRequest->GetDescription().c_str() );
757 }
758
759 pAggregatedWaitTime += waitSeconds;
760
761 // We need a special case if the data node comes from metalink
762 // redirector. In this case it might make more sense to try the
763 // next entry in the Metalink than wait.
764 if( OmitWait( *pRequest, pLoadBalancer.url ) )
765 {
766 int maxWait = DefaultMaxMetalinkWait;
767 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
768 if( pAggregatedWaitTime > maxWait )
769 {
770 UpdateTriedCGI();
771 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
772 return;
773 }
774 }
775
776 //----------------------------------------------------------------------
777 // Some messages require rewriting before they can be sent again
778 // after wait
779 //----------------------------------------------------------------------
780 Status st = RewriteRequestWait();
781 if( !st.IsOK() )
782 {
783 pStatus = st;
784 HandleResponse();
785 return;
786 }
787
788 //----------------------------------------------------------------------
789 // Register a task to resend the message in some seconds, if we still
790 // have time to do that, and report a timeout otherwise
791 //----------------------------------------------------------------------
792 time_t resendTime = ::time(0)+waitSeconds;
793
794 if( resendTime < pExpiration )
795 {
796 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: 0x%x (message: %s ).",
797 pUrl.GetHostId().c_str(), this,
798 pRequest->GetDescription().c_str() );
799
800 TaskManager *taskMgr = pPostMaster->GetTaskManager();
801 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
802 }
803 else
804 {
805 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
806 pUrl.GetHostId().c_str(),
807 pRequest->GetDescription().c_str() );
808 HandleError( Status( stError, errOperationExpired) );
809 }
810 return;
811 }
812
813 //------------------------------------------------------------------------
814 // kXR_waitresp - the response will be returned in some seconds as an
815 // unsolicited message. Currently all messages of this type are handled
816 // one step before in the XrdClStream::OnIncoming as they need to be
817 // processed synchronously.
818 //------------------------------------------------------------------------
819 case kXR_waitresp:
820 {
821 if( rsp->hdr.dlen < 4 )
822 {
823 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
824 pUrl.GetHostId().c_str() );
825 pStatus = Status( stError, errInvalidResponse );
826 HandleResponse();
827 return;
828 }
829
830 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
831 "message %s", pUrl.GetHostId().c_str(),
832 rsp->body.waitresp.seconds,
833 pRequest->GetDescription().c_str() );
834 return;
835 }
836
837 //------------------------------------------------------------------------
838 // Default - unrecognized/unsupported response, declare an error
839 //------------------------------------------------------------------------
840 default:
841 {
842 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
843 "message %s", pUrl.GetHostId().c_str(),
844 rsp->hdr.status, pRequest->GetDescription().c_str() );
845 pStatus = Status( stError, errInvalidResponse );
846 HandleResponse();
847 return;
848 }
849 }
850
851 return;
852 }
853
854 //----------------------------------------------------------------------------
855 // Handle an event other that a message arrival - may be timeout
856 //----------------------------------------------------------------------------
858 XRootDStatus status )
859 {
860 Log *log = DefaultEnv::GetLog();
861 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
862 pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
863
864 if( event == Ready )
865 return 0;
866
867 if( pTimeoutFence.load( std::memory_order_relaxed ) )
868 return 0;
869
870 HandleError( status );
871 return RemoveHandler;
872 }
873
874 //----------------------------------------------------------------------------
875 // Read message body directly from a socket
876 //----------------------------------------------------------------------------
878 Socket *socket,
879 uint32_t &bytesRead )
880 {
881 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
882 uint16_t reqId = ntohs( req->header.requestid );
883
884 if( reqId == kXR_pgread )
885 return pPageReader->Read( *socket, bytesRead );
886
887 return pBodyReader->Read( *socket, bytesRead );
888 }
889
890 //----------------------------------------------------------------------------
891 // We're here when we requested sending something over the wire
892 // and there has been a status update on this action
893 //----------------------------------------------------------------------------
895 XRootDStatus status )
896 {
897 Log *log = DefaultEnv::GetLog();
898
899 //--------------------------------------------------------------------------
900 // We were successful, so we now need to listen for a response
901 //--------------------------------------------------------------------------
902 if( status.IsOK() )
903 {
904 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
905 pUrl.GetHostId().c_str(), message->GetDescription().c_str() );
906
907 log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: 0x%x (message: %s ) from out-queue to in-queue.",
908 pUrl.GetHostId().c_str(), this,
909 pRequest->GetDescription().c_str() );
910
911 pMsgInFly = true;
912 return;
913 }
914
915 //--------------------------------------------------------------------------
916 // We have failed, recover if possible
917 //--------------------------------------------------------------------------
918 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
919 "recover.", pUrl.GetHostId().c_str(),
920 message->GetDescription().c_str() );
921 HandleError( status );
922 }
923
924 //----------------------------------------------------------------------------
925 // Are we a raw writer or not?
926 //----------------------------------------------------------------------------
928 {
929 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
930 uint16_t reqId = ntohs( req->header.requestid );
931 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
932 return true;
933 // checkpoint + execute
934 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
935 {
936 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
937 reqId = ntohs( xeq->header.requestid );
938 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
939 }
940
941 return false;
942 }
943
944 //----------------------------------------------------------------------------
945 // Write the message body
946 //----------------------------------------------------------------------------
948 uint32_t &bytesWritten )
949 {
950 //--------------------------------------------------------------------------
951 // First check if it is a PgWrite
952 //--------------------------------------------------------------------------
953 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
954 {
955 //------------------------------------------------------------------------
956 // PgWrite will have just one chunk
957 //------------------------------------------------------------------------
958 ChunkInfo chunk = pChunkList->front();
959 //------------------------------------------------------------------------
960 // Calculate the size of the first and last page (in case the chunk is not
961 // 4KB aligned)
962 //------------------------------------------------------------------------
963 int fLen = 0, lLen = 0;
964 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
965
966 //------------------------------------------------------------------------
967 // Set the crc32c buffer if not ready yet
968 //------------------------------------------------------------------------
969 if( pPgWrtCksumBuff.GetCursor() == 0 )
970 {
971 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
972 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
973 }
974
975 uint32_t btsLeft = chunk.length - pAsyncOffset;
976 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
977 if( pglen > btsLeft ) pglen = btsLeft;
978 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
979
980 while( btsLeft > 0 )
981 {
982 // first write the crc32c digest
983 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
984 {
985 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
986 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
987 int btswrt = 0;
988 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
989 if( !st.IsOK() ) return st;
990 bytesWritten += btswrt;
991 pPgWrtCksumBuff.AdvanceCursor( btswrt );
992 if( st.code == suRetry ) return st;
993 }
994 // then write the raw data (one page)
995 int btswrt = 0;
996 Status st = socket->Send( pgbuf, pglen, btswrt );
997 if( !st.IsOK() ) return st;
998 pgbuf += btswrt;
999 pglen -= btswrt;
1000 btsLeft -= btswrt;
1001 bytesWritten += btswrt;
1002 pAsyncOffset += btswrt; // update the offset to the raw data
1003 if( st.code == suRetry ) return st;
1004 // if we managed to write all the data ...
1005 if( pglen == 0 )
1006 {
1007 // move to the next page
1008 ++pPgWrtCurrentPageNb;
1009 if( pPgWrtCurrentPageNb < nbpgs )
1010 {
1011 // set the digest buffer
1012 pPgWrtCksumBuff.SetCursor( 0 );
1013 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1014 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1015 }
1016 // set the page length
1017 pglen = XrdSys::PageSize;
1018 if( pglen > btsLeft ) pglen = btsLeft;
1019 // reset offset in the current page
1020 pPgWrtCurrentPageOffset = 0;
1021 }
1022 else
1023 // otherwise just adjust the offset in the current page
1024 pPgWrtCurrentPageOffset += btswrt;
1025
1026 }
1027 }
1028 else if( !pChunkList->empty() )
1029 {
1030 size_t size = pChunkList->size();
1031 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1032 {
1033 char *buffer = (char*)(*pChunkList)[i].buffer;
1034 uint32_t size = (*pChunkList)[i].length;
1035 size_t leftToBeWritten = size - pAsyncOffset;
1036
1037 while( leftToBeWritten )
1038 {
1039 int btswrt = 0;
1040 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1041 bytesWritten += btswrt;
1042 if( !st.IsOK() || st.code == suRetry ) return st;
1043 pAsyncOffset += btswrt;
1044 leftToBeWritten -= btswrt;
1045 }
1046 //----------------------------------------------------------------------
1047 // Remember that we have moved to the next chunk, also clear the offset
1048 // within the buffer as we are going to move to a new one
1049 //----------------------------------------------------------------------
1050 ++pAsyncChunkIndex;
1051 pAsyncOffset = 0;
1052 }
1053 }
1054 else
1055 {
1056 Log *log = DefaultEnv::GetLog();
1057
1058 //------------------------------------------------------------------------
1059 // If the socket is encrypted we cannot use a kernel buffer, we have to
1060 // convert to user space buffer
1061 //------------------------------------------------------------------------
1062 if( socket->IsEncrypted() )
1063 {
1064 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1065 pUrl.GetHostId().c_str() );
1066
1067 char *ubuff = 0;
1068 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1069 if( ret < 0 ) return Status( stError, errInternal );
1070 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1071 return WriteMessageBody( socket, bytesWritten );
1072 }
1073
1074 //------------------------------------------------------------------------
1075 // Send the data
1076 //------------------------------------------------------------------------
1077 while( !pKBuff->Empty() )
1078 {
1079 int btswrt = 0;
1080 Status st = socket->Send( *pKBuff, btswrt );
1081 bytesWritten += btswrt;
1082 if( !st.IsOK() || st.code == suRetry ) return st;
1083 }
1084
1085 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1086 pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str() );
1087 }
1088
1089 return Status();
1090 }
1091
1092 //----------------------------------------------------------------------------
1093 // We're here when we got a time event. We needed to re-issue the request
1094 // in some time in the future, and that moment has arrived
1095 //----------------------------------------------------------------------------
1097 {
1098 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1099 }
1100
1101 //----------------------------------------------------------------------------
1102 // Bookkeeping after partial response has been received.
1103 //----------------------------------------------------------------------------
1105 {
1106 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1107 }
1108
1109 //----------------------------------------------------------------------------
1110 // Unpack the message and call the response handler
1111 //----------------------------------------------------------------------------
1112 void XRootDMsgHandler::HandleResponse()
1113 {
1114 //--------------------------------------------------------------------------
1115 // Process the response and notify the listener
1116 //--------------------------------------------------------------------------
1118 XRootDStatus *status = ProcessStatus();
1119 AnyObject *response = 0;
1120
1121 Log *log = DefaultEnv::GetLog();
1122 log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: 0x%x (message: %s ) "
1123 "with status: %s.",
1124 pUrl.GetHostId().c_str(), this,
1125 pRequest->GetDescription().c_str(),
1126 status->ToString().c_str() );
1127
1128 if( status->IsOK() )
1129 {
1130 Status st = ParseResponse( response );
1131 if( !st.IsOK() )
1132 {
1133 delete status;
1134 delete response;
1135 status = new XRootDStatus( st );
1136 response = 0;
1137 }
1138 }
1139
1140 //--------------------------------------------------------------------------
1141 // Close the redirect entry if necessary
1142 //--------------------------------------------------------------------------
1143 if( pRdirEntry )
1144 {
1145 pRdirEntry->status = *status;
1146 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1147 }
1148
1149 //--------------------------------------------------------------------------
1150 // Is it a final response?
1151 //--------------------------------------------------------------------------
1152 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1153
1154 //--------------------------------------------------------------------------
1155 // Release the stream id
1156 //--------------------------------------------------------------------------
1157 if( pSidMgr && finalrsp )
1158 {
1159 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1160 if( status->IsOK() || !pMsgInFly ||
1161 !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1162 pSidMgr->ReleaseSID( req->header.streamid );
1163 }
1164
1165 HostList *hosts = pHosts.release();
1166 if( !finalrsp )
1167 pHosts.reset( new HostList( *hosts ) );
1168
1169 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1170
1171 //--------------------------------------------------------------------------
1172 // if it is the final response there is nothing more to do ...
1173 //--------------------------------------------------------------------------
1174 if( finalrsp )
1175 delete this;
1176 //--------------------------------------------------------------------------
1177 // on the other hand if it is not the final response, we have to keep the
1178 // MsgHandler and delete the current response
1179 //--------------------------------------------------------------------------
1180 else
1181 {
1182 XrdSysCondVarHelper lck( pCV );
1183 pResponse.reset();
1184 pTimeoutFence.store( false, std::memory_order_relaxed );
1185 pCV.Broadcast();
1186 }
1187 }
1188
1189
1190 //----------------------------------------------------------------------------
1191 // Extract the status information from the stuff that we got
1192 //----------------------------------------------------------------------------
1193 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1194 {
1195 XRootDStatus *st = new XRootDStatus( pStatus );
1196 ServerResponse *rsp = 0;
1197 if( pResponse )
1198 rsp = (ServerResponse *)pResponse->GetBuffer();
1199
1200 if( !pStatus.IsOK() && rsp )
1201 {
1202 if( pStatus.code == errErrorResponse )
1203 {
1204 st->errNo = rsp->body.error.errnum;
1205 // omit the last character as the string returned from the server
1206 // (acording to protocol specs) should be null-terminated
1207 std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1208 if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1209 errmsg += " Last seen error: " + pLastError.ToString();
1210 st->SetErrorMessage( errmsg );
1211 }
1212 else if( pStatus.code == errRedirect )
1213 st->SetErrorMessage( pRedirectUrl );
1214 }
1215 return st;
1216 }
1217
1218 //------------------------------------------------------------------------
1219 // Parse the response and put it in an object that could be passed to
1220 // the user
1221 //------------------------------------------------------------------------
1222 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1223 {
1224 if( !pResponse )
1225 return Status();
1226
1227 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1228 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1229 Log *log = DefaultEnv::GetLog();
1230
1231 //--------------------------------------------------------------------------
1232 // Handle redirect as an answer
1233 //--------------------------------------------------------------------------
1234 if( rsp->hdr.status == kXR_redirect )
1235 {
1236 log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1237 return 0;
1238 }
1239
1240 Buffer buff;
1241 uint32_t length = 0;
1242 char *buffer = 0;
1243
1244 //--------------------------------------------------------------------------
1245 // We don't have any partial answers so pass what we have
1246 //--------------------------------------------------------------------------
1247 if( pPartialResps.empty() )
1248 {
1249 buffer = rsp->body.buffer.data;
1250 length = rsp->hdr.dlen;
1251 }
1252 //--------------------------------------------------------------------------
1253 // Partial answers, we need to glue them together before parsing
1254 //--------------------------------------------------------------------------
1255 else if( req->header.requestid != kXR_read &&
1256 req->header.requestid != kXR_readv )
1257 {
1258 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1259 {
1260 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1261 length += part->hdr.dlen;
1262 }
1263 length += rsp->hdr.dlen;
1264
1265 buff.Allocate( length );
1266 uint32_t offset = 0;
1267 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1268 {
1269 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1270 buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1271 offset += part->hdr.dlen;
1272 }
1273 buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1274 buffer = buff.GetBuffer();
1275 }
1276
1277 //--------------------------------------------------------------------------
1278 // Right, but what was the question?
1279 //--------------------------------------------------------------------------
1280 switch( req->header.requestid )
1281 {
1282 //------------------------------------------------------------------------
1283 // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1284 // kXR_ping, kXR_close, kXR_write, kXR_sync
1285 //------------------------------------------------------------------------
1286 case kXR_mv:
1287 case kXR_truncate:
1288 case kXR_rm:
1289 case kXR_mkdir:
1290 case kXR_rmdir:
1291 case kXR_chmod:
1292 case kXR_ping:
1293 case kXR_close:
1294 case kXR_write:
1295 case kXR_writev:
1296 case kXR_sync:
1297 case kXR_chkpoint:
1298 return Status();
1299
1300 //------------------------------------------------------------------------
1301 // kXR_locate
1302 //------------------------------------------------------------------------
1303 case kXR_locate:
1304 {
1305 AnyObject *obj = new AnyObject();
1306
1307 char *nullBuffer = new char[length+1];
1308 nullBuffer[length] = 0;
1309 memcpy( nullBuffer, buffer, length );
1310
1311 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1312 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1313 pRequest->GetDescription().c_str(), nullBuffer );
1314 LocationInfo *data = new LocationInfo();
1315
1316 if( data->ParseServerResponse( nullBuffer ) == false )
1317 {
1318 delete obj;
1319 delete data;
1320 delete [] nullBuffer;
1321 return Status( stError, errInvalidResponse );
1322 }
1323 delete [] nullBuffer;
1324
1325 obj->Set( data );
1326 response = obj;
1327 return Status();
1328 }
1329
1330 //------------------------------------------------------------------------
1331 // kXR_stat
1332 //------------------------------------------------------------------------
1333 case kXR_stat:
1334 {
1335 AnyObject *obj = new AnyObject();
1336
1337 //----------------------------------------------------------------------
1338 // Virtual File System stat (kXR_vfs)
1339 //----------------------------------------------------------------------
1340 if( req->stat.options & kXR_vfs )
1341 {
1342 StatInfoVFS *data = new StatInfoVFS();
1343
1344 char *nullBuffer = new char[length+1];
1345 nullBuffer[length] = 0;
1346 memcpy( nullBuffer, buffer, length );
1347
1348 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1349 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1350 pRequest->GetDescription().c_str(), nullBuffer );
1351
1352 if( data->ParseServerResponse( nullBuffer ) == false )
1353 {
1354 delete obj;
1355 delete data;
1356 delete [] nullBuffer;
1357 return Status( stError, errInvalidResponse );
1358 }
1359 delete [] nullBuffer;
1360
1361 obj->Set( data );
1362 }
1363 //----------------------------------------------------------------------
1364 // Normal stat
1365 //----------------------------------------------------------------------
1366 else
1367 {
1368 StatInfo *data = new StatInfo();
1369
1370 char *nullBuffer = new char[length+1];
1371 nullBuffer[length] = 0;
1372 memcpy( nullBuffer, buffer, length );
1373
1374 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1375 "%s", pUrl.GetHostId().c_str(),
1376 pRequest->GetDescription().c_str(), nullBuffer );
1377
1378 if( data->ParseServerResponse( nullBuffer ) == false )
1379 {
1380 delete obj;
1381 delete data;
1382 delete [] nullBuffer;
1383 return Status( stError, errInvalidResponse );
1384 }
1385 delete [] nullBuffer;
1386 obj->Set( data );
1387 }
1388
1389 response = obj;
1390 return Status();
1391 }
1392
1393 //------------------------------------------------------------------------
1394 // kXR_protocol
1395 //------------------------------------------------------------------------
1396 case kXR_protocol:
1397 {
1398 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1399 pUrl.GetHostId().c_str(),
1400 pRequest->GetDescription().c_str() );
1401
1402 if( rsp->hdr.dlen < 8 )
1403 {
1404 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1405 pUrl.GetHostId().c_str() );
1406 return Status( stError, errInvalidResponse );
1407 }
1408
1409 AnyObject *obj = new AnyObject();
1410 ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1411 rsp->body.protocol.flags );
1412 obj->Set( data );
1413 response = obj;
1414 return Status();
1415 }
1416
1417 //------------------------------------------------------------------------
1418 // kXR_dirlist
1419 //------------------------------------------------------------------------
1420 case kXR_dirlist:
1421 {
1422 AnyObject *obj = new AnyObject();
1423 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1424 "DirectoryList", pUrl.GetHostId().c_str(),
1425 pRequest->GetDescription().c_str() );
1426
1427 char *path = new char[req->dirlist.dlen+1];
1428 path[req->dirlist.dlen] = 0;
1429 memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1430
1431 DirectoryList *data = new DirectoryList();
1432 data->SetParentName( path );
1433 delete [] path;
1434
1435 char *nullBuffer = new char[length+1];
1436 nullBuffer[length] = 0;
1437 memcpy( nullBuffer, buffer, length );
1438
1439 bool invalidrsp = false;
1440
1441 if( !pDirListStarted )
1442 {
1443 pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1444 pDirListStarted = true;
1445
1446 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1447 }
1448 else
1449 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1450
1451 if( invalidrsp )
1452 {
1453 delete data;
1454 delete obj;
1455 delete [] nullBuffer;
1456 return Status( stError, errInvalidResponse );
1457 }
1458
1459 delete [] nullBuffer;
1460 obj->Set( data );
1461 response = obj;
1462 return Status();
1463 }
1464
1465 //------------------------------------------------------------------------
1466 // kXR_open - if we got the statistics, otherwise return 0
1467 //------------------------------------------------------------------------
1468 case kXR_open:
1469 {
1470 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1471 pUrl.GetHostId().c_str(),
1472 pRequest->GetDescription().c_str() );
1473
1474 if( rsp->hdr.dlen < 4 )
1475 {
1476 log->Error( XRootDMsg, "[%s] Got invalid open response.",
1477 pUrl.GetHostId().c_str() );
1478 return Status( stError, errInvalidResponse );
1479 }
1480
1481 AnyObject *obj = new AnyObject();
1482 StatInfo *statInfo = 0;
1483
1484 //----------------------------------------------------------------------
1485 // Handle StatInfo if requested
1486 //----------------------------------------------------------------------
1487 if( req->open.options & kXR_retstat )
1488 {
1489 log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1490 pUrl.GetHostId().c_str(),
1491 pRequest->GetDescription().c_str() );
1492
1493 if( rsp->hdr.dlen >= 12 )
1494 {
1495 char *nullBuffer = new char[rsp->hdr.dlen-11];
1496 nullBuffer[rsp->hdr.dlen-12] = 0;
1497 memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1498
1499 statInfo = new StatInfo();
1500 if( statInfo->ParseServerResponse( nullBuffer ) == false )
1501 {
1502 delete statInfo;
1503 statInfo = 0;
1504 }
1505 delete [] nullBuffer;
1506 }
1507
1508 if( rsp->hdr.dlen < 12 || !statInfo )
1509 {
1510 log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1511 "to %s", pUrl.GetHostId().c_str(),
1512 pRequest->GetDescription().c_str() );
1513 delete obj;
1514 return Status( stError, errInvalidResponse );
1515 }
1516 }
1517
1518 OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1519 pResponse->GetSessionId(),
1520 statInfo );
1521 obj->Set( data );
1522 response = obj;
1523 return Status();
1524 }
1525
1526 //------------------------------------------------------------------------
1527 // kXR_read
1528 //------------------------------------------------------------------------
1529 case kXR_read:
1530 {
1531 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1532 pUrl.GetHostId().c_str(),
1533 pRequest->GetDescription().c_str() );
1534
1535 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1536 {
1537 //--------------------------------------------------------------------
1538 // we are expecting to have only the header in the message, the raw
1539 // data have been readout into the user buffer
1540 //--------------------------------------------------------------------
1541 if( pPartialResps[i]->GetSize() > 8 )
1542 return Status( stOK, errInternal );
1543 }
1544 //----------------------------------------------------------------------
1545 // we are expecting to have only the header in the message, the raw
1546 // data have been readout into the user buffer
1547 //----------------------------------------------------------------------
1548 if( pResponse->GetSize() > 8 )
1549 return Status( stOK, errInternal );
1550 //----------------------------------------------------------------------
1551 // Get the response for the end user
1552 //----------------------------------------------------------------------
1553 return pBodyReader->GetResponse( response );
1554 }
1555
1556 //------------------------------------------------------------------------
1557 // kXR_pgread
1558 //------------------------------------------------------------------------
1559 case kXR_pgread:
1560 {
1561 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1562 pUrl.GetHostId().c_str(),
1563 pRequest->GetDescription().c_str() );
1564
1565 //----------------------------------------------------------------------
1566 // Glue in the cached responses if necessary
1567 //----------------------------------------------------------------------
1568 ChunkInfo chunk = pChunkList->front();
1569 bool sizeMismatch = false;
1570 uint32_t currentOffset = 0;
1571 char *cursor = (char*)chunk.buffer;
1572 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1573 {
1574 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1575
1576 //--------------------------------------------------------------------
1577 // the actual size of the raw data without the crc32c checksums
1578 //--------------------------------------------------------------------
1579 size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1580 part->status.bdy.dlen ) * CksumSize;
1581
1582 if( currentOffset + datalen > chunk.length )
1583 {
1584 sizeMismatch = true;
1585 break;
1586 }
1587
1588 currentOffset += datalen;
1589 cursor += datalen;
1590 }
1591
1592 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1593 size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1594 rspst->status.bdy.dlen ) * CksumSize;
1595 if( currentOffset + datalen <= chunk.length )
1596 currentOffset += datalen;
1597 else
1598 sizeMismatch = true;
1599
1600 //----------------------------------------------------------------------
1601 // Overflow
1602 //----------------------------------------------------------------------
1603 if( pChunkStatus.front().sizeError || sizeMismatch )
1604 {
1605 log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1606 "buffer is too small for the received data.",
1607 pUrl.GetHostId().c_str(),
1608 pRequest->GetDescription().c_str() );
1609 return Status( stError, errInvalidResponse );
1610 }
1611
1612 AnyObject *obj = new AnyObject();
1613 PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1614 std::move( pCrc32cDigests) );
1615
1616 obj->Set( pgInfo );
1617 response = obj;
1618 return Status();
1619 }
1620
1621 //------------------------------------------------------------------------
1622 // kXR_pgwrite
1623 //------------------------------------------------------------------------
1624 case kXR_pgwrite:
1625 {
1626 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1627
1628 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1629 if( rsp->status.bdy.dlen > 0 )
1630 {
1631 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1632 size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1633 retries.reserve( pgcnt );
1634 kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1635 sizeof( ServerResponseBody_pgWrCSE ) );
1636
1637 for( size_t i = 0; i < pgcnt; ++i )
1638 {
1639 uint32_t len = XrdSys::PageSize;
1640 if( i == 0 ) len = cse->dlFirst;
1641 else if( i == pgcnt - 1 ) len = cse->dlLast;
1642 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1643 }
1644 }
1645
1646 RetryInfo *info = new RetryInfo( std::move( retries ) );
1647 AnyObject *obj = new AnyObject();
1648 obj->Set( info );
1649 response = obj;
1650
1651 return Status();
1652 }
1653
1654
1655 //------------------------------------------------------------------------
1656 // kXR_readv - we need to pass the length of the buffer to the user code
1657 //------------------------------------------------------------------------
1658 case kXR_readv:
1659 {
1660 log->Dump( XRootDMsg, "[%s] Parsing the response to 0x%x as "
1661 "VectorReadInfo", pUrl.GetHostId().c_str(),
1662 pRequest->GetDescription().c_str() );
1663
1664 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1665 {
1666 //--------------------------------------------------------------------
1667 // we are expecting to have only the header in the message, the raw
1668 // data have been readout into the user buffer
1669 //--------------------------------------------------------------------
1670 if( pPartialResps[i]->GetSize() > 8 )
1671 return Status( stOK, errInternal );
1672 }
1673 //----------------------------------------------------------------------
1674 // we are expecting to have only the header in the message, the raw
1675 // data have been readout into the user buffer
1676 //----------------------------------------------------------------------
1677 if( pResponse->GetSize() > 8 )
1678 return Status( stOK, errInternal );
1679 //----------------------------------------------------------------------
1680 // Get the response for the end user
1681 //----------------------------------------------------------------------
1682 return pBodyReader->GetResponse( response );
1683 }
1684
1685 //------------------------------------------------------------------------
1686 // kXR_fattr
1687 //------------------------------------------------------------------------
1688 case kXR_fattr:
1689 {
1690 int len = rsp->hdr.dlen;
1691 char* data = rsp->body.buffer.data;
1692
1693 return ParseXAttrResponse( data, len, response );
1694 }
1695
1696 //------------------------------------------------------------------------
1697 // kXR_query
1698 //------------------------------------------------------------------------
1699 case kXR_query:
1700 case kXR_set:
1701 case kXR_prepare:
1702 default:
1703 {
1704 AnyObject *obj = new AnyObject();
1705 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1706 pUrl.GetHostId().c_str(),
1707 pRequest->GetDescription().c_str() );
1708
1709 BinaryDataInfo *data = new BinaryDataInfo();
1710 data->Allocate( length );
1711 data->Append( buffer, length );
1712 obj->Set( data );
1713 response = obj;
1714 return Status();
1715 }
1716 };
1717 return Status( stError, errInvalidMessage );
1718 }
1719
1720 //------------------------------------------------------------------------
1721 // Parse the response to kXR_fattr request and put it in an object that
1722 // could be passed to the user
1723 //------------------------------------------------------------------------
1724 Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1725 AnyObject *&response )
1726 {
1727 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1728// Log *log = DefaultEnv::GetLog(); //TODO
1729
1730 switch( req->fattr.subcode )
1731 {
1732 case kXR_fattrDel:
1733 case kXR_fattrSet:
1734 {
1735 Status status;
1736
1737 kXR_char nerrs = 0;
1738 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1739 return status;
1740
1741 kXR_char nattr = 0;
1742 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1743 return status;
1744
1745 std::vector<XAttrStatus> resp;
1746 // read the namevec
1747 for( kXR_char i = 0; i < nattr; ++i )
1748 {
1749 kXR_unt16 rc = 0;
1750 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1751 return status;
1752 rc = ntohs( rc );
1753
1754 // count errors
1755 if( rc ) --nerrs;
1756
1757 std::string name;
1758 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1759 return status;
1760
1761 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1762 XRootDStatus();
1763 resp.push_back( XAttrStatus( name, st ) );
1764 }
1765
1766 // check if we read all the data and if the error count is OK
1767 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1768
1769 // set up the response object
1770 response = new AnyObject();
1771 response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1772
1773 return Status();
1774 }
1775
1776 case kXR_fattrGet:
1777 {
1778 Status status;
1779
1780 kXR_char nerrs = 0;
1781 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1782 return status;
1783
1784 kXR_char nattr = 0;
1785 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1786 return status;
1787
1788 std::vector<XAttr> resp;
1789 resp.reserve( nattr );
1790
1791 // read the name vec
1792 for( kXR_char i = 0; i < nattr; ++i )
1793 {
1794 kXR_unt16 rc = 0;
1795 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1796 return status;
1797 rc = ntohs( rc );
1798
1799 // count errors
1800 if( rc ) --nerrs;
1801
1802 std::string name;
1803 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1804 return status;
1805
1806 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1807 XRootDStatus();
1808 resp.push_back( XAttr( name, st ) );
1809 }
1810
1811 // read the value vec
1812 for( kXR_char i = 0; i < nattr; ++i )
1813 {
1814 kXR_int32 vlen = 0;
1815 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1816 return status;
1817 vlen = ntohl( vlen );
1818
1819 std::string value;
1820 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1821 return status;
1822
1823 resp[i].value.swap( value );
1824 }
1825
1826 // check if we read all the data and if the error count is OK
1827 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1828
1829 // set up the response object
1830 response = new AnyObject();
1831 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1832
1833 return Status();
1834 }
1835
1836 case kXR_fattrList:
1837 {
1838 Status status;
1839 std::vector<XAttr> resp;
1840
1841 while( len > 0 )
1842 {
1843 std::string name;
1844 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1845 return status;
1846
1847 kXR_int32 vlen = 0;
1848 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1849 return status;
1850 vlen = ntohl( vlen );
1851
1852 std::string value;
1853 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1854 return status;
1855
1856 resp.push_back( XAttr( name, value ) );
1857 }
1858
1859 // set up the response object
1860 response = new AnyObject();
1861 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1862
1863 return Status();
1864 }
1865
1866 default:
1867 return Status( stError, errDataError );
1868 }
1869 }
1870
1871 //----------------------------------------------------------------------------
1872 // Perform the changes to the original request needed by the redirect
1873 // procedure - allocate new streamid, append redirection data and such
1874 //----------------------------------------------------------------------------
1875 Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1876 {
1877 Log *log = DefaultEnv::GetLog();
1878
1879 Status st;
1880 // Append any "xrd.*" parameters present in newCgi so that any authentication
1881 // requirements are properly enforced
1882 const URL::ParamsMap &newCgi = newUrl.GetParams();
1883 std::string xrdCgi = "";
1884 std::ostringstream ossXrd;
1885 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1886 {
1887 if( it->first.compare( 0, 4, "xrd." ) )
1888 continue;
1889 ossXrd << it->first << '=' << it->second << '&';
1890 }
1891
1892 xrdCgi = ossXrd.str();
1893 // Redirection URL containing also any original xrd.* opaque parameters
1894 XrdCl::URL authUrl;
1895
1896 if (xrdCgi.empty())
1897 {
1898 authUrl = newUrl;
1899 }
1900 else
1901 {
1902 std::string surl = newUrl.GetURL();
1903 (surl.find('?') == std::string::npos) ? (surl += '?') :
1904 ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1905 surl += xrdCgi;
1906
1907 if (!authUrl.FromString(surl))
1908 {
1909 log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data:"
1910 "%s", surl.c_str());
1911 return Status(stError, errInvalidRedirectURL);
1912 }
1913 }
1914
1915 //--------------------------------------------------------------------------
1916 // Rewrite particular requests
1917 //--------------------------------------------------------------------------
1919 MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1921 return Status();
1922 }
1923
1924 //----------------------------------------------------------------------------
1925 // Some requests need to be rewritten also after getting kXR_wait
1926 //----------------------------------------------------------------------------
1927 Status XRootDMsgHandler::RewriteRequestWait()
1928 {
1929 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1930
1932
1933 //------------------------------------------------------------------------
1934 // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1935 // turned off after wait
1936 //------------------------------------------------------------------------
1937 switch( req->header.requestid )
1938 {
1939 case kXR_locate:
1940 {
1941 uint16_t refresh = kXR_refresh;
1942 req->locate.options &= (~refresh);
1943 break;
1944 }
1945
1946 case kXR_open:
1947 {
1948 uint16_t refresh = kXR_refresh;
1949 req->locate.options &= (~refresh);
1950 break;
1951 }
1952 }
1953
1956 return Status();
1957 }
1958
1959 //----------------------------------------------------------------------------
1960 // Recover error
1961 //----------------------------------------------------------------------------
1962 void XRootDMsgHandler::HandleError( XRootDStatus status )
1963 {
1964 //--------------------------------------------------------------------------
1965 // If there was no error then do nothing
1966 //--------------------------------------------------------------------------
1967 if( status.IsOK() )
1968 return;
1969
1970 if( pSidMgr && pMsgInFly && (
1971 status.code == errOperationExpired ||
1972 status.code == errOperationInterrupted ) )
1973 {
1974 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1975 pSidMgr->TimeOutSID( req->header.streamid );
1976 }
1977
1978 bool noreplicas = ( status.code == errErrorResponse &&
1979 status.errNo == kXR_noReplicas );
1980
1981 if( !noreplicas ) pLastError = status;
1982
1983 Log *log = DefaultEnv::GetLog();
1984 log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
1985 pUrl.GetHostId().c_str(), pRequest->GetDescription().c_str(),
1986 status.ToString().c_str() );
1987
1988 //--------------------------------------------------------------------------
1989 // Check if it is a fatal TLS error that has been marked as potentially
1990 // recoverable, if yes check if we can downgrade from fatal to error.
1991 //--------------------------------------------------------------------------
1992 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
1993 {
1994 if( pSslErrCnt < MaxSslErrRetry )
1995 {
1996 status.status &= ~stFatal; // switch off fatal&error bits
1997 status.status |= stError; // switch on error bit
1998 }
1999 ++pSslErrCnt; // count number of consecutive SSL errors
2000 }
2001 else
2002 pSslErrCnt = 0;
2003
2004 //--------------------------------------------------------------------------
2005 // We have got an error message, we can recover it at the load balancer if:
2006 // 1) we haven't got it from the load balancer
2007 // 2) we have a load balancer assigned
2008 // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2009 // kXR_NotFound
2010 // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2011 //--------------------------------------------------------------------------
2012 if( status.code == errErrorResponse )
2013 {
2014 if( RetriableErrorResponse( status ) )
2015 {
2016 UpdateTriedCGI(status.errNo);
2017 if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2018 SwitchOnRefreshFlag();
2019 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2020 return;
2021 }
2022 else
2023 {
2024 pStatus = status;
2025 HandleRspOrQueue();
2026 return;
2027 }
2028 }
2029
2030 //--------------------------------------------------------------------------
2031 // Nothing can be done if:
2032 // 1) a user timeout has occurred
2033 // 2) has a non-zero session id
2034 // 3) if another error occurred and the validity of the message expired
2035 //--------------------------------------------------------------------------
2036 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2037 status.code == errOperationInterrupted || time(0) >= pExpiration )
2038 {
2039 log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2040 pUrl.GetHostId().c_str(),
2041 pRequest->GetDescription().c_str() );
2042 pStatus = status;
2043 HandleRspOrQueue();
2044 return;
2045 }
2046
2047 //--------------------------------------------------------------------------
2048 // At this point we're left with connection errors, we recover them
2049 // at a load balancer if we have one and if not on the current server
2050 // until we get a response, an unrecoverable error or a timeout
2051 //--------------------------------------------------------------------------
2052 if( pLoadBalancer.url.IsValid() &&
2053 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2054 {
2055 UpdateTriedCGI( kXR_ServerError );
2056 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2057 return;
2058 }
2059 else
2060 {
2061 if( !status.IsFatal() && IsRetriable() )
2062 {
2063 log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2064 pUrl.GetHostId().c_str(),
2065 pRequest->GetDescription().c_str() );
2066
2067 UpdateTriedCGI( kXR_ServerError );
2068 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2069 return;
2070 }
2071 pStatus = status;
2072 HandleRspOrQueue();
2073 return;
2074 }
2075 }
2076
2077 //----------------------------------------------------------------------------
2078 // Retry the message at another server
2079 //----------------------------------------------------------------------------
2080 Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2081 {
2082 pResponse.reset();
2083 Log *log = DefaultEnv::GetLog();
2084
2085 //--------------------------------------------------------------------------
2086 // Set up a redirect entry
2087 //--------------------------------------------------------------------------
2088 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2089 pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2090
2091 if( pUrl.GetLocation() != url.GetLocation() )
2092 {
2093 pHosts->push_back( url );
2094
2095 //------------------------------------------------------------------------
2096 // Assign a new stream id to the message
2097 //------------------------------------------------------------------------
2098
2099 // first release the old stream id
2100 // (though it could be a redirect from a local
2101 // metalink file, in this case there's no SID)
2102 ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2103 if( pSidMgr )
2104 {
2105 pSidMgr->ReleaseSID( req->streamid );
2106 pSidMgr.reset();
2107 }
2108
2109 // then get the new SIDManager
2110 // (again this could be a redirect to a local
2111 // file and in this case there is no SID)
2112 if( !url.IsLocalFile() )
2113 {
2114 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2115 Status st = pSidMgr->AllocateSID( req->streamid );
2116 if( !st.IsOK() )
2117 {
2118 log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2119 pUrl.GetHostId().c_str(),
2120 pRequest->GetDescription().c_str() );
2121 return st;
2122 }
2123 }
2124
2125 pUrl = url;
2126 }
2127
2128 if( pUrl.IsMetalink() && pFollowMetalink )
2129 {
2130 log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: 0x%x (message: %s ).",
2131 pUrl.GetHostId().c_str(), this,
2132 pRequest->GetDescription().c_str() );
2133
2134 return pPostMaster->Redirect( pUrl, pRequest, this );
2135 }
2136 else if( pUrl.IsLocalFile() )
2137 {
2138 HandleLocalRedirect( &pUrl );
2139 return Status();
2140 }
2141 else
2142 {
2143 log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: 0x%x (message: %s ).",
2144 pUrl.GetHostId().c_str(), this,
2145 pRequest->GetDescription().c_str() );
2146 return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2147 }
2148 }
2149
2150 //----------------------------------------------------------------------------
2151 // Update the "tried=" part of the CGI of the current message
2152 //----------------------------------------------------------------------------
2153 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2154 {
2155 URL::ParamsMap cgi;
2156 std::string tried;
2157
2158 //--------------------------------------------------------------------------
2159 // In case a data server responded with a kXR_redirect and we fail at the
2160 // node where we were redirected to, the original data server should be
2161 // included in the tried CGI opaque info (instead of the current one).
2162 //--------------------------------------------------------------------------
2163 if( pEffectiveDataServerUrl )
2164 {
2165 tried = pEffectiveDataServerUrl->GetHostName();
2166 delete pEffectiveDataServerUrl;
2167 pEffectiveDataServerUrl = 0;
2168 }
2169 //--------------------------------------------------------------------------
2170 // Otherwise use the current URL.
2171 //--------------------------------------------------------------------------
2172 else
2173 tried = pUrl.GetHostName();
2174
2175 // Report the reason for the failure to the next location
2176 //
2177 if (errNo)
2178 { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2179 else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2180 else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2181 else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2182 }
2183
2184 //--------------------------------------------------------------------------
2185 // If our current load balancer is a metamanager and we failed either
2186 // at a diskserver or at an unidentified node we also exclude the last
2187 // known manager
2188 //--------------------------------------------------------------------------
2189 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2190 {
2191 HostList::reverse_iterator it;
2192 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2193 {
2194 if( it->loadBalancer )
2195 break;
2196
2197 tried += "," + it->url.GetHostName();
2198
2199 if( it->flags & kXR_isManager )
2200 break;
2201 }
2202 }
2203
2204 cgi["tried"] = tried;
2206 MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2208 }
2209
2210 //----------------------------------------------------------------------------
2211 // Switch on the refresh flag for some requests
2212 //----------------------------------------------------------------------------
2213 void XRootDMsgHandler::SwitchOnRefreshFlag()
2214 {
2216 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2217 switch( req->header.requestid )
2218 {
2219 case kXR_locate:
2220 {
2221 req->locate.options |= kXR_refresh;
2222 break;
2223 }
2224
2225 case kXR_open:
2226 {
2227 req->locate.options |= kXR_refresh;
2228 break;
2229 }
2230 }
2233 }
2234
2235 //------------------------------------------------------------------------
2236 // If the current thread is a worker thread from our thread-pool
2237 // handle the response, otherwise submit a new task to the thread-pool
2238 //------------------------------------------------------------------------
2239 void XRootDMsgHandler::HandleRspOrQueue()
2240 {
2241 JobManager *jobMgr = pPostMaster->GetJobManager();
2242 if( jobMgr->IsWorker() )
2243 HandleResponse();
2244 else
2245 {
2246 Log *log = DefaultEnv::GetLog();
2247 log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: 0x%x (message: %s ).",
2248 pUrl.GetHostId().c_str(), this,
2249 pRequest->GetDescription().c_str() );
2250 jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2251 }
2252 }
2253
2254 //------------------------------------------------------------------------
2255 // Notify the FileStateHandler to retry Open() with new URL
2256 //------------------------------------------------------------------------
2257 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2258 {
2259 Log *log = DefaultEnv::GetLog();
2260 log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: 0x%x (message: %s ).",
2261 pUrl.GetHostId().c_str(), this,
2262 pRequest->GetDescription().c_str() );
2263
2264 if( !pLFileHandler )
2265 {
2266 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2267 return;
2268 }
2269
2270 AnyObject *resp = 0;
2271 pLFileHandler->SetHostList( *pHosts );
2272 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2273 if( !st.IsOK() )
2274 {
2275 HandleError( st );
2276 return;
2277 }
2278
2279 pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2280 resp,
2281 pHosts.release() );
2282 delete this;
2283
2284 return;
2285 }
2286
2287 //------------------------------------------------------------------------
2288 // Check if it is OK to retry this request
2289 //------------------------------------------------------------------------
2290 bool XRootDMsgHandler::IsRetriable()
2291 {
2292 std::string value;
2293 DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2294 if( value == "true" ) return true;
2295
2296 // check if it is a mutable open (open + truncate or open + create)
2297 ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2298 if( req->header.requestid == htons( kXR_open ) )
2299 {
2300 bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2301 ( req->open.options & htons( kXR_new ) );
2302
2303 if( _mutable )
2304 {
2305 Log *log = DefaultEnv::GetLog();
2306 log->Debug( XRootDMsg,
2307 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2308 pUrl.GetHostId().c_str(),
2309 pRequest->GetDescription().c_str() );
2310 // disallow retry if it is a mutable open
2311 return false;
2312 }
2313 }
2314
2315 return true;
2316 }
2317
2318 //------------------------------------------------------------------------
2319 // Check if for given request and Metalink redirector it is OK to omit
2320 // the kXR_wait and proceed straight to the next entry in the Metalink file
2321 //------------------------------------------------------------------------
2322 bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2323 {
2324 // we can omit kXR_wait only if we have a Metalink redirector
2325 if( !url.IsMetalink() )
2326 return false;
2327
2328 // we can omit kXR_wait only for requests that can be redirected
2329 // (kXR_read is the only stateful request that can be redirected)
2330 ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2331 if( pStateful && req->header.requestid != kXR_read )
2332 return false;
2333
2334 // we can only omit kXR_wait if the Metalink redirect has more
2335 // replicas
2336 RedirectorRegistry &registry = RedirectorRegistry::Instance();
2337 VirtualRedirector *redirector = registry.Get( url );
2338
2339 // we need more than one server as the current one is not reflected
2340 // in tried CGI
2341 if( redirector->Count( request ) > 1 )
2342 return true;
2343
2344 return false;
2345 }
2346
2347 //------------------------------------------------------------------------
2348 // Checks if the given error returned by server is retriable.
2349 //------------------------------------------------------------------------
2350 bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2351 {
2352 // we can only retry error response if we have a valid load-balancer and
2353 // it is not our current URL
2354 if( !( pLoadBalancer.url.IsValid() &&
2355 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2356 return false;
2357
2358 // following errors are retriable at any load-balancer
2359 if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2360 status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2361 status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2362 return true;
2363
2364 // check if the load-balancer is a meta-manager, if yes there are
2365 // more errors that can be recovered
2366 if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2367
2368 // those errors are retriable for meta-managers
2369 if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2370 return true;
2371
2372 // in case of not-authorized error there is an imposed upper limit
2373 // on how many times we can retry this error
2374 if( status.errNo == kXR_NotAuthorized )
2375 {
2377 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2378 bool ret = pNotAuthorizedCounter < limit;
2379 ++pNotAuthorizedCounter;
2380 if( !ret )
2381 {
2382 Log *log = DefaultEnv::GetLog();
2383 log->Error( XRootDMsg,
2384 "[%s] Reached limit of NotAuthorized retries!",
2385 pUrl.GetHostId().c_str() );
2386 }
2387 return ret;
2388 }
2389
2390 // check if the load-balancer is a virtual (metalink) redirector,
2391 // if yes there are even more errors that can be recovered
2392 if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2393
2394 // those errors are retriable for virtual (metalink) redirectors
2395 if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2396 return true;
2397
2398 // otherwise it is a non-retriable error
2399 return false;
2400 }
2401
2402 //------------------------------------------------------------------------
2403 // Dump the redirect-trace-back into the log file
2404 //------------------------------------------------------------------------
2405 void XRootDMsgHandler::DumpRedirectTraceBack()
2406 {
2407 if( pRedirectTraceBack.empty() ) return;
2408
2409 std::stringstream sstrm;
2410
2411 sstrm << "Redirect trace-back:\n";
2412
2413 int counter = 0;
2414
2415 auto itr = pRedirectTraceBack.begin();
2416 sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2417
2418 auto prev = itr;
2419 ++itr;
2420 ++counter;
2421
2422 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2423 sstrm << '\t' << counter << ". "
2424 << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2425
2426 int authlimit = DefaultNotAuthorizedRetryLimit;
2427 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2428
2429 bool warn = !pStatus.IsOK() &&
2430 ( pStatus.code == errNotFound ||
2431 pStatus.code == errRedirectLimit ||
2432 ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2433
2434 Log *log = DefaultEnv::GetLog();
2435 if( warn )
2436 log->Warning( XRootDMsg, sstrm.str().c_str() );
2437 else
2438 log->Debug( XRootDMsg, sstrm.str().c_str() );
2439 }
2440
2441 // Read data from buffer
2442 //------------------------------------------------------------------------
2443 template<typename T>
2444 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2445 {
2446 if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2447
2448 memcpy(&result, buffer, sizeof(T));
2449
2450 buffer += sizeof( T );
2451 buflen -= sizeof( T );
2452
2453 return Status();
2454 }
2455
2456 //------------------------------------------------------------------------
2457 // Read a string from buffer
2458 //------------------------------------------------------------------------
2459 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2460 {
2461 Status status;
2462 char c = 0;
2463
2464 while( true )
2465 {
2466 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2467 return status;
2468
2469 if( c == 0 ) break;
2470 result += c;
2471 }
2472
2473 return status;
2474 }
2475
2476 //------------------------------------------------------------------------
2477 // Read a string from buffer
2478 //------------------------------------------------------------------------
2479 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2480 size_t size, std::string &result )
2481 {
2482 Status status;
2483
2484 if( size > buflen ) return Status( stError, errDataError );
2485
2486 result.append( buffer, size );
2487 buffer += size;
2488 buflen -= size;
2489
2490 return status;
2491 }
2492
2493}
@ kXR_NotAuthorized
Definition XProtocol.hh:998
@ kXR_NotFound
Definition XProtocol.hh:999
@ kXR_FileLocked
Definition XProtocol.hh:991
@ kXR_noReplicas
@ kXR_Unsupported
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ArgTooLong
Definition XProtocol.hh:990
@ kXR_noserver
@ kXR_IOError
Definition XProtocol.hh:995
@ kXR_FSError
Definition XProtocol.hh:993
@ kXR_NoMemory
Definition XProtocol.hh:996
#define kXR_isManager
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:852
#define kXR_collapseRedir
ServerResponseStatus status
#define kXR_attrMeta
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:912
kXR_unt16 options
Definition XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition XProtocol.hh:850
static const int kXR_ckpXeq
Definition XProtocol.hh:216
union ServerResponse::@103 body
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:858
@ kXR_waitresp
Definition XProtocol.hh:904
@ kXR_redirect
Definition XProtocol.hh:902
@ kXR_oksofar
Definition XProtocol.hh:898
@ kXR_status
Definition XProtocol.hh:905
@ kXR_ok
Definition XProtocol.hh:897
@ kXR_attn
Definition XProtocol.hh:899
@ kXR_wait
Definition XProtocol.hh:903
@ kXR_error
Definition XProtocol.hh:901
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:844
#define kXR_recoverWrts
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
union ServerResponseV2::@105 info
#define kXR_isServer
#define kXR_attrVirtRdr
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:847
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
@ kXR_vfs
Definition XProtocol.hh:761
struct ClientStatRequest stat
Definition XProtocol.hh:871
#define kXR_ecRedir
struct ClientLocateRequest locate
Definition XProtocol.hh:854
ServerResponseHeader hdr
long long kXR_int64
Definition XPtypes.hh:98
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetDescription() const
Get the description of the message.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:451
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:148
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition XrdClURL.cc:58
void SetPassword(const std::string &password)
Set the password.
Definition XrdClURL.hh:156
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:388
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:130
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:330
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:165
bool IsLocalFile() const
Definition XrdClURL.cc:460
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:121
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:239
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:113
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:438
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:138
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const Message * GetRequest() const
Get the request pointer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
virtual void Process()
Process the message if it was "taken" by the examine action.
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t errNotSupported
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
none object for initializing empty Optional
XrdSysError Log
Definition XrdConfig.cc:111
@ kXR_PartialResult
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version