9#include "XrdVersion.hh"
36uint64_t TPCHandler::m_monid{0};
37int TPCHandler::m_marker_period = 5;
38size_t TPCHandler::m_block_size = 16*1024*1024;
39size_t TPCHandler::m_small_block_size = 1*1024*1024;
48TPCHandler::TPCLogRecord::~TPCLogRecord()
55 monInfo.
clID = clID.c_str();
57 gettimeofday(&monInfo.
endT, 0);
60 {monInfo.
dstURL = local.c_str();
61 monInfo.
srcURL = remote.c_str();
63 monInfo.
dstURL = remote.c_str();
64 monInfo.
srcURL = local.c_str();
68 if (!status) monInfo.
endRC = 0;
69 else if (tpc_status > 0) monInfo.
endRC = tpc_status;
70 else monInfo.
endRC = 1;
71 monInfo.
strm =
static_cast<unsigned char>(streams);
72 monInfo.
fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
75 tpcMonitor->Report(monInfo);
85 if (curl) curl_easy_cleanup(curl);
109 std::stringstream parser(opaque);
110 std::string sequence;
111 std::stringstream output;
113 while (
getline(parser, sequence,
'&')) {
114 if (sequence.empty()) {
continue;}
115 size_t equal_pos = sequence.find(
'=');
117 if (equal_pos != std::string::npos)
118 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
120 if (!val && equal_pos != std::string::npos) {
continue;}
122 if (!first) output <<
"&";
124 output << sequence.substr(0, equal_pos);
126 output <<
"=" << val;
138TPCHandler::ConfigureCurlCA(
CURL *curl)
140 auto ca_filename = m_ca_file ? m_ca_file->CAFilename() :
"";
141 auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() :
"";
142 if (!ca_filename.empty() && !crl_filename.empty()) {
143 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
147 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
148 if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
149 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
151 std::ostringstream oss;
152 oss <<
"No valid CRL file has been found in the file " << crl_filename <<
". Disabling CRL checking.";
153 m_log.Log(
Warning,
"TpcHandler",oss.str().c_str());
156 else if (!m_cadir.empty()) {
157 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
159 if (!m_cafile.empty()) {
160 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
166 return !strcmp(verb,
"COPY") || !strcmp(verb,
"OPTIONS");
174 if (!strncmp(input.c_str(),
"davs://", 7)) {
175 return "https://" + input.substr(7);
185 if (req.
verb ==
"OPTIONS") {
186 return ProcessOptionsReq(req);
189 if (header != req.
headers.end()) {
190 if (header->second !=
"none") {
191 m_log.Emsg(
"ProcessReq",
"COPY requested an unsupported credential type: ", header->second.c_str());
192 return req.
SendSimpleResp(400, NULL, NULL,
"COPY requestd an unsupported Credential type", 0);
196 if (header != req.
headers.end()) {
198 return ProcessPullReq(src, req);
201 if (header != req.
headers.end()) {
202 return ProcessPushReq(header->second, req);
204 m_log.Emsg(
"ProcessReq",
"COPY verb requested but no source or destination specified.");
205 return req.
SendSimpleResp(400, NULL, NULL,
"No Source or Destination specified", 0);
222 m_fixed_route(false),
224 m_first_timeout(120),
225 m_log(log->logger(),
"TPC_"),
227 m_request_manager(*myEnv, *log)
229 if (!Configure(config, myEnv)) {
230 throw std::runtime_error(
"Failed to configure the HTTP third-party-copy handler.");
248 return req.
SendSimpleResp(200, NULL, (
char *)
"DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
258 if (authz_header != req.
headers.end()) {
259 std::stringstream ss;
260 ss <<
"authz=" <<
encode_str(authz_header->second);
270int TPCHandler::RedirectTransfer(
CURL *curl,
const std::string &redirect_resource,
271 XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
275 if ((ptr == NULL) || (*ptr ==
'\0') || (port == 0)) {
277 std::stringstream ss;
278 ss <<
"Internal error: redirect without hostname";
279 logTransferEvent(
LogMask::Error, rec,
"REDIRECT_INTERNAL_ERROR", ss.str());
280 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
284 std::string rdr_info = ptr;
285 std::string host, opaque;
286 size_t pos = rdr_info.find(
'?');
287 host = rdr_info.substr(0, pos);
289 if (pos != std::string::npos) {
290 opaque = rdr_info.substr(pos + 1);
293 std::stringstream ss;
294 ss <<
"Location: http" << (m_desthttps ?
"s" :
"") <<
"://" << host <<
":" << port <<
"/" << redirect_resource;
296 if (!opaque.empty()) {
302 return req.
SendSimpleResp(rec.status, NULL,
const_cast<char *
>(ss.str().c_str()),
310int TPCHandler::OpenWaitStall(XrdSfsFile &fh,
const std::string &resource,
311 int mode,
int openMode,
const XrdSecEntity &sec,
312 const std::string &authz)
319 size_t pos = resource.find(
'?');
321 std::string path = resource.substr(0, pos);
323 if (pos != std::string::npos) {
324 opaque = resource.substr(pos + 1);
329 opaque += (opaque.empty() ?
"" :
"&");
332 open_result = fh.
open(path.c_str(), mode, openMode, &sec, opaque.c_str());
336 if (open_result ==
SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
337 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
353int TPCHandler::DetermineXferSize(
CURL *curl, XrdHttpExtReq &req,
State &state,
354 bool &success, TPCLogRecord &rec,
bool shouldReturnErrorToClient) {
356 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
358 curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
360 res = curl_easy_perform(curl);
363 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
365 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
366 if (res == CURLE_HTTP_RETURNED_ERROR) {
367 std::stringstream ss;
368 ss <<
"Remote server failed request while fetching remote size";
369 std::stringstream ss2;
370 ss2 << ss.str() <<
": " << curl_easy_strerror(res);
373 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
375 std::stringstream ss;
376 ss <<
"Remote side " << req.
clienthost <<
" failed with status code " << state.
GetStatusCode() <<
" while fetching remote size";
379 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
381 std::stringstream ss;
382 ss <<
"Internal transfer failure while fetching remote size";
383 std::stringstream ss2;
384 ss2 << ss.str() <<
" - HTTP library failed: " << curl_easy_strerror(res);
387 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
389 std::stringstream ss;
390 ss <<
"Successfully determined remote size for pull request: "
397int TPCHandler::GetContentLengthTPCPull(
CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength,
bool & success, TPCLogRecord &rec) {
404 if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
415int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state, std::string desc) {
416 std::stringstream ss;
417 const std::string crlf =
"\n";
418 ss <<
"Perf Marker" << crlf;
419 ss <<
"Timestamp: " << time(NULL) << crlf;
420 ss <<
"Stripe Index: 0" << crlf;
422 ss <<
"Total Stripe Count: 1" << crlf;
423 if (!desc.empty()) ss <<
"RemoteConnections: " << desc << crlf;
427 return req.
ChunkResp(ss.str().c_str(), 0);
434int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
435 std::stringstream ss;
436 const std::string crlf =
"\n";
437 ss <<
"Perf Marker" << crlf;
438 ss <<
"Timestamp: " << time(NULL) << crlf;
439 ss <<
"Stripe Index: 0" << crlf;
441 ss <<
"Total Stripe Count: 1" << crlf;
446 ss <<
"RemoteConnections: " << desc << crlf;
451 return req.
ChunkResp(ss.str().c_str(), 0);
458int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
459 off_t bytes_transferred)
473 std::stringstream ss;
474 const std::string crlf =
"\n";
475 ss <<
"Perf Marker" << crlf;
476 ss <<
"Timestamp: " << time(NULL) << crlf;
477 ss <<
"Stripe Index: 0" << crlf;
478 ss <<
"Stripe Bytes Transferred: " << bytes_transferred << crlf;
479 ss <<
"Total Stripe Count: 1" << crlf;
483 std::stringstream ss2;
484 for (std::vector<State*>::const_iterator iter = state.begin();
485 iter != state.end(); iter++)
487 std::string desc = (*iter)->GetConnectionDescription();
489 ss2 << (first ?
"" :
",") << desc;
494 ss <<
"RemoteConnections: " << ss2.str() << crlf;
496 rec.bytes_transferred = bytes_transferred;
499 return req.
ChunkResp(ss.str().c_str(), 0);
506int TPCHandler::RunCurlWithUpdates(
CURL *curl, XrdHttpExtReq &req,
State &state, TPCLogRecord &rec) {
509 TPCRequestManager::TPCRequest request(request_label, req.
mSciTag, curl);
511 if (!m_request_manager.Produce(request)) {
513 "Unable to accept HTTP-TPC requests "
514 "because server is too busy. Try again later");
516 logTransferEvent(
LogMask::Error, rec,
"RESPONSE_FAIL",
"Failed to send the initial response to the TPC client");
524 int retval = req.
StartChunkedResp(201,
"Created",
"Content-Type: text/plain");
528 "Failed to send the initial response to the TPC client");
531 "Initial transfer response sent to the TPC client");
535 off_t last_advance_bytes = 0;
536 time_t last_advance_time = time(NULL);
537 time_t transfer_start = last_advance_time;
538 CURLcode res =
static_cast<CURLcode
>(-1);
543 while ((res = (CURLcode)request.WaitFor(std::chrono::seconds(m_marker_period))) < 0) {
544 auto now = time(NULL);
545 std::string conn_desc = request.GetRemoteConnDesc();
547 if (bytes_xfer > last_advance_bytes) {
548 last_advance_bytes = bytes_xfer;
549 last_advance_time = now;
551 if (SendPerfMarker(req, rec, state, conn_desc)) {
553 logTransferEvent(
LogMask::Error, rec,
"PERFMARKER_FAIL",
"Failed to send a perf marker to the TPC client");
555 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
556 if (now > last_advance_time + timeout) {
557 const char *log_prefix = rec.log_prefix.c_str();
558 bool tpc_pull = strncmp(
"Pull", log_prefix, 4) == 0;
561 std::stringstream ss;
562 ss <<
"Transfer failed because no bytes have been "
563 << (tpc_pull ?
"received from the source (pull mode) in " :
"transmitted to the destination (push mode) in ") << timeout
581 std::stringstream ss;
582 bool success =
false;
585 std::stringstream ss2;
586 ss2 <<
"Remote side failed with status code " << state.
GetStatusCode();
588 std::replace(err.begin(), err.end(),
'\n',
' ');
589 ss2 <<
"; error message: \"" << err <<
"\"";
591 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
592 ss << generateClientErr(ss2, rec);
595 if (err.empty()) {err =
"(no error message provided)";}
596 else {std::replace(err.begin(), err.end(),
'\n',
' ');}
597 std::stringstream ss2;
598 ss2 <<
"Error when interacting with local filesystem: " << err;
599 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
600 ss << generateClientErr(ss2, rec);
601 }
else if (res != CURLE_OK) {
602 std::stringstream ss2;
603 ss2 <<
"Internal transfer failure";
604 std::stringstream ss3;
605 ss3 << ss2.str() <<
": " << curl_easy_strerror(res);
606 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss3.str());
607 ss << generateClientErr(ss2, rec, res);
609 ss <<
"success: Created";
613 if ((retval = req.
ChunkResp(ss.str().c_str(), 0))) {
615 "Failed to send last update to remote client");
617 }
else if (success) {
628int TPCHandler::ProcessPushReq(
const std::string & resource, XrdHttpExtReq &req) {
630 rec.log_prefix =
"PushRequest";
632 rec.remote = resource;
636 if (name) rec.name = name;
637 logTransferEvent(
LogMask::Info, rec,
"PUSH_START",
"Starting a push request");
640 auto curl = curlPtr.get();
642 std::stringstream ss;
643 ss <<
"Failed to initialize internal transfer resources";
646 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
648 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
649 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
650 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
652 std::string redirect_resource = req.
resource;
653 if (query_header != req.
headers.end()) {
654 redirect_resource = query_header->second;
658 uint64_t file_monid =
AtomicInc(m_monid);
660 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
663 std::stringstream ss;
664 ss <<
"Failed to initialize internal transfer file handle";
667 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
669 std::string full_url = prepareURL(req);
671 std::string authz = GetAuthz(req);
673 int open_results = OpenWaitStall(*fh, full_url,
SFS_O_RDONLY, 0644,
676 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
678 }
else if (
SFS_OK != open_results) {
680 std::stringstream ss;
682 if (msg == NULL) ss <<
"Failed to open local resource";
686 int resp_result = req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
690 ConfigureCurlCA(curl);
691 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
693 Stream stream(std::move(fh), 0, 0, m_log);
697 return RunCurlWithUpdates(curl, req, state, rec);
704int TPCHandler::ProcessPullReq(
const std::string &resource, XrdHttpExtReq &req) {
706 rec.log_prefix =
"PullRequest";
708 rec.remote = resource;
712 if (name) rec.name = name;
713 logTransferEvent(
LogMask::Info, rec,
"PULL_START",
"Starting a pull request");
716 auto curl = curlPtr.get();
718 std::stringstream ss;
719 ss <<
"Failed to initialize internal transfer resources";
722 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
739 std::string host_used;
740 if (host_header != req.
headers.end()) {
741 host_used = host_header->second;
747 ip = (
char *)malloc(ip_size-1);
750 memcpy(ip, buff+1, ip_size-2);
754 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
756 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
757 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
758 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
759 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
760 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
762 std::stringstream ss;
763 ss <<
"Failed to initialize internal transfer file handle";
766 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
769 std::string redirect_resource = req.
resource;
770 if (query_header != req.
headers.end()) {
771 redirect_resource = query_header->second;
775 if ((overwrite_header == req.
headers.end()) || (overwrite_header->second ==
"T")) {
781 if (streams_header != req.
headers.end()) {
784 stream_req = std::stol(streams_header->second);
787 if (stream_req < 0 || stream_req > 100) {
788 std::stringstream ss;
789 ss <<
"Invalid request for number of streams";
791 logTransferEvent(
LogMask::Info, rec,
"INVALID_REQUEST", ss.str());
792 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
794 streams = stream_req == 0 ? 1 : stream_req;
797 rec.streams = streams;
798 std::string full_url = prepareURL(req);
799 std::string authz = GetAuthz(req);
800 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
801 ConfigureCurlCA(curl);
802 uint64_t sourceFileContentLength = 0;
807 GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
811 full_url +=
"&oss.asize=" + std::to_string(sourceFileContentLength);
818 int open_result = OpenWaitStall(*fh, full_url, mode|
SFS_O_WRONLY,
822 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
824 }
else if (
SFS_OK != open_result) {
826 std::stringstream ss;
828 if ((msg == NULL) || (*msg ==
'\0')) ss <<
"Failed to open local resource";
833 generateClientErr(ss, rec).c_str(), 0);
837 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
843 return RunCurlWithStreams(req, state, streams, rec);
845 return RunCurlWithUpdates(curl, req, state, rec);
853void TPCHandler::logTransferEvent(
LogMask mask,
const TPCLogRecord &rec,
854 const std::string &event,
const std::string &message)
856 if (!(m_log.getMsgMask() & mask)) {
return;}
858 std::stringstream ss;
859 ss <<
"event=" <<
event <<
", local=" << rec.local <<
", remote=" << rec.remote;
860 if (rec.name.empty())
861 ss <<
", user=(anonymous)";
863 ss <<
", user=" << rec.name;
864 if (rec.streams != 1)
865 ss <<
", streams=" << rec.streams;
866 if (rec.bytes_transferred >= 0)
867 ss <<
", bytes_transferred=" << rec.bytes_transferred;
869 ss <<
", status=" << rec.status;
870 if (rec.tpc_status >= 0)
871 ss <<
", tpc_status=" << rec.tpc_status;
872 if (!message.empty())
873 ss <<
"; " << message;
874 m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
877std::string TPCHandler::generateClientErr(std::stringstream &err_ss,
const TPCLogRecord &rec, CURLcode cCode) {
878 std::stringstream ssret;
879 ssret <<
"failure: " << err_ss.str() <<
", local=" << rec.local <<
", remote=" << rec.remote;
880 if(cCode != CURLcode::CURLE_OK) {
881 ssret <<
", HTTP library failure=" << curl_easy_strerror(cCode);
892 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
893 log->
Emsg(
"TPCInitialize",
"libcurl failed to initialize");
899 log->
Emsg(
"TPCInitialize",
"TPC handler requires a config filename in order to load");
903 log->
Emsg(
"TPCInitialize",
"Will load configuration for the TPC handler from", config);
905 }
catch (std::runtime_error &re) {
906 log->
Emsg(
"TPCInitialize",
"Encountered a runtime failure when loading ", re.what());
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
static std::string PrepareURL(const std::string &input)
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
int mapErrNoToHttp(int errNo)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
void getline(uchar *buff, int blen)
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
void * GetPtr(const char *varname)
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
char * vorg
Entity's virtual organization(s)
char * name
Entity's name.
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info