XRootD
Loading...
Searching...
No Matches
XrdHttpTpcTPC.cc
Go to the documentation of this file.
4#include "XrdOuc/XrdOucEnv.hh"
8#include "XrdSys/XrdSysFD.hh"
9#include "XrdVersion.hh"
10
15
16#include <curl/curl.h>
17
18#include <dlfcn.h>
19#include <fcntl.h>
20
21#include <algorithm>
22#include <memory>
23#include <sstream>
24#include <stdexcept>
25#include <thread>
26
27#include "XrdHttpTpcState.hh"
28#include "XrdHttpTpcStream.hh"
29#include "XrdHttpTpcTPC.hh"
30#include <fstream>
31
32using namespace TPC;
33
34XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
35
36uint64_t TPCHandler::m_monid{0};
37int TPCHandler::m_marker_period = 5;
38size_t TPCHandler::m_block_size = 16*1024*1024;
39size_t TPCHandler::m_small_block_size = 1*1024*1024;
40XrdSysMutex TPCHandler::m_monid_mutex;
41
43
44/******************************************************************************/
45/* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
46/******************************************************************************/
47
48TPCHandler::TPCLogRecord::~TPCLogRecord()
49{
50// Record monitoring data is enabled
51//
52 if (tpcMonitor)
54
55 monInfo.clID = clID.c_str();
56 monInfo.begT = begT;
57 gettimeofday(&monInfo.endT, 0);
58
59 if (mTpcType == TpcType::Pull)
60 {monInfo.dstURL = local.c_str();
61 monInfo.srcURL = remote.c_str();
62 } else {
63 monInfo.dstURL = remote.c_str();
64 monInfo.srcURL = local.c_str();
66 }
67
68 if (!status) monInfo.endRC = 0;
69 else if (tpc_status > 0) monInfo.endRC = tpc_status;
70 else monInfo.endRC = 1;
71 monInfo.strm = static_cast<unsigned char>(streams);
72 monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
73 if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
74
75 tpcMonitor->Report(monInfo);
76 }
77}
78
79/******************************************************************************/
80/* C u r l D e l e t e r : : o p e r a t o r ( ) */
81/******************************************************************************/
82
84{
85 if (curl) curl_easy_cleanup(curl);
86}
87
88
89/******************************************************************************/
90/* p r e p a r e U R L */
91/******************************************************************************/
92
93// See XrdHttpTpcUtils::prepareOpenURL() documentation
94std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
95 return XrdHttpTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap);
96}
97
98/******************************************************************************/
99/* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
100/******************************************************************************/
101
102// When processing a redirection from the filesystem layer, it is permitted to return
103// some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
104// more permissive than a URI (basically, only '&' and '=' are disallowed while some
105// URI parsers may dislike characters like '"'). This function takes an opaque string
106// (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
107std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
108{
109 std::stringstream parser(opaque);
110 std::string sequence;
111 std::stringstream output;
112 bool first = true;
113 while (getline(parser, sequence, '&')) {
114 if (sequence.empty()) {continue;}
115 size_t equal_pos = sequence.find('=');
116 char *val = NULL;
117 if (equal_pos != std::string::npos)
118 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
119 // Do not emit parameter if value exists and escaping failed.
120 if (!val && equal_pos != std::string::npos) {continue;}
121
122 if (!first) output << "&";
123 first = false;
124 output << sequence.substr(0, equal_pos);
125 if (val) {
126 output << "=" << val;
127 curl_free(val);
128 }
129 }
130 return output.str();
131}
132
133/******************************************************************************/
134/* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
135/******************************************************************************/
136
137void
138TPCHandler::ConfigureCurlCA(CURL *curl)
139{
140 auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
141 auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
142 if (!ca_filename.empty() && !crl_filename.empty()) {
143 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
144 //Check that the CRL file contains at least one entry before setting this option to curl
145 //Indeed, an empty CRL file will make curl unhappy and therefore will fail
146 //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
147 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
148 if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
149 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
150 } else {
151 std::ostringstream oss;
152 oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
153 m_log.Log(Warning,"TpcHandler",oss.str().c_str());
154 }
155 }
156 else if (!m_cadir.empty()) {
157 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
158 }
159 if (!m_cafile.empty()) {
160 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
161 }
162}
163
164
165bool TPCHandler::MatchesPath(const char *verb, const char *path) {
166 return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
167}
168
169/******************************************************************************/
170/* P r e p a r e U R L */
171/******************************************************************************/
172
173static std::string PrepareURL(const std::string &input) {
174 if (!strncmp(input.c_str(), "davs://", 7)) {
175 return "https://" + input.substr(7);
176 }
177 return input;
178}
179
180/******************************************************************************/
181/* T P C H a n d l e r : : P r o c e s s R e q */
182/******************************************************************************/
183
185 if (req.verb == "OPTIONS") {
186 return ProcessOptionsReq(req);
187 }
188 auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
189 if (header != req.headers.end()) {
190 if (header->second != "none") {
191 m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
192 return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
193 }
194 }
195 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
196 if (header != req.headers.end()) {
197 std::string src = PrepareURL(header->second);
198 return ProcessPullReq(src, req);
199 }
200 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
201 if (header != req.headers.end()) {
202 return ProcessPushReq(header->second, req);
203 }
204 m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
205 return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
206}
207
208/******************************************************************************/
209/* T P C H a n d l e r D e s t r u c t o r */
210/******************************************************************************/
211
213 m_sfs = NULL;
214}
215
216/******************************************************************************/
217/* T P C H a n d l e r C o n s t r u c t o r */
218/******************************************************************************/
219
220TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
221 m_desthttps(false),
222 m_fixed_route(false),
223 m_timeout(60),
224 m_first_timeout(120),
225 m_log(log->logger(), "TPC_"),
226 m_sfs(NULL),
227 m_request_manager(*myEnv, *log)
228{
229 if (!Configure(config, myEnv)) {
230 throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
231 }
232
233// Extract out the TPC monitoring object (we share it with xrootd).
234//
235 XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
236 if (gs)
237 TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
238}
239
240/******************************************************************************/
241/* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
242/******************************************************************************/
243
247int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
248 return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
249}
250
251/******************************************************************************/
252/* T P C H a n d l e r : : G e t A u t h z */
253/******************************************************************************/
254
255std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
256 std::string authz;
257 auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
258 if (authz_header != req.headers.end()) {
259 std::stringstream ss;
260 ss << "authz=" << encode_str(authz_header->second);
261 authz += ss.str();
262 }
263 return authz;
264}
265
266/******************************************************************************/
267/* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
268/******************************************************************************/
269
270int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
271 XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
272{
273 int port;
274 const char *ptr = error.getErrText(port);
275 if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
276 rec.status = 500;
277 std::stringstream ss;
278 ss << "Internal error: redirect without hostname";
279 logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
280 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
281 }
282
283 // Construct redirection URL taking into consideration any opaque info
284 std::string rdr_info = ptr;
285 std::string host, opaque;
286 size_t pos = rdr_info.find('?');
287 host = rdr_info.substr(0, pos);
288
289 if (pos != std::string::npos) {
290 opaque = rdr_info.substr(pos + 1);
291 }
292
293 std::stringstream ss;
294 ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
295
296 if (!opaque.empty()) {
297 ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
298 }
299
300 rec.status = 307;
301 logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
302 return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
303 NULL, 0);
304}
305
306/******************************************************************************/
307/* T P C H a n d l e r : : O p e n W a i t S t a l l */
308/******************************************************************************/
309
310int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
311 int mode, int openMode, const XrdSecEntity &sec,
312 const std::string &authz)
313{
314 int open_result;
315 while (1) {
316 int orig_ucap = fh.error.getUCap();
317 fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
318 std::string opaque;
319 size_t pos = resource.find('?');
320 // Extract the path and opaque info from the resource
321 std::string path = resource.substr(0, pos);
322
323 if (pos != std::string::npos) {
324 opaque = resource.substr(pos + 1);
325 }
326
327 // Append the authz information if there are some
328 if(!authz.empty()) {
329 opaque += (opaque.empty() ? "" : "&");
330 opaque += authz;
331 }
332 open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
333
334 if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
335 int secs_to_stall = fh.error.getErrInfo();
336 if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
337 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
338 }
339 break;
340 }
341 return open_result;
342}
343
344/******************************************************************************/
345/* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
346/******************************************************************************/
347
348
349
353int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
354 bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
355 success = false;
356 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
357 // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
358 curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
359 CURLcode res;
360 res = curl_easy_perform(curl);
361 //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
362 //don't want the next curl call to do be a HEAD request
363 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
364 // Reset the CURLOPT_TIMEOUT to no timeout (default)
365 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
366 if (res == CURLE_HTTP_RETURNED_ERROR) {
367 std::stringstream ss;
368 ss << "Remote server failed request while fetching remote size";
369 std::stringstream ss2;
370 ss2 << ss.str() << ": " << curl_easy_strerror(res);
371 rec.status = 500;
372 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
373 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
374 } else if (state.GetStatusCode() >= 400) {
375 std::stringstream ss;
376 ss << "Remote side " << req.clienthost << " failed with status code " << state.GetStatusCode() << " while fetching remote size";
377 rec.status = 500;
378 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
379 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
380 } else if (res) {
381 std::stringstream ss;
382 ss << "Internal transfer failure while fetching remote size";
383 std::stringstream ss2;
384 ss2 << ss.str() << " - HTTP library failed: " << curl_easy_strerror(res);
385 rec.status = 500;
386 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
387 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
388 }
389 std::stringstream ss;
390 ss << "Successfully determined remote size for pull request: "
391 << state.GetContentLength();
392 logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
393 success = true;
394 return 0;
395}
396
397int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
398 State state(curl,req.tpcForwardCreds);
399 //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
400 //it will fail
401 state.SetupHeaders(req);
402 int result;
403 //In case we cannot get the content length, we return the error to the client
404 if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
405 return result;
406 }
407 contentLength = state.GetContentLength();
408 return result;
409}
410
411/******************************************************************************/
412/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
413/******************************************************************************/
414
415int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state, std::string desc) {
416 std::stringstream ss;
417 const std::string crlf = "\n";
418 ss << "Perf Marker" << crlf;
419 ss << "Timestamp: " << time(NULL) << crlf;
420 ss << "Stripe Index: 0" << crlf;
421 ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
422 ss << "Total Stripe Count: 1" << crlf;
423 if (!desc.empty()) ss << "RemoteConnections: " << desc << crlf;
424 ss << "End" << crlf;
425 rec.bytes_transferred = state.BytesTransferred();
426 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
427 return req.ChunkResp(ss.str().c_str(), 0);
428}
429
430/******************************************************************************/
431/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
432/******************************************************************************/
433
434int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
435 std::stringstream ss;
436 const std::string crlf = "\n";
437 ss << "Perf Marker" << crlf;
438 ss << "Timestamp: " << time(NULL) << crlf;
439 ss << "Stripe Index: 0" << crlf;
440 ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
441 ss << "Total Stripe Count: 1" << crlf;
442 // Include the TCP connection associated with this transfer; used by
443 // the TPC client for monitoring purposes.
444 std::string desc = state.GetConnectionDescription();
445 if (!desc.empty())
446 ss << "RemoteConnections: " << desc << crlf;
447 ss << "End" << crlf;
448 rec.bytes_transferred = state.BytesTransferred();
449 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
450
451 return req.ChunkResp(ss.str().c_str(), 0);
452}
453
454/******************************************************************************/
455/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
456/******************************************************************************/
457
458int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
459 off_t bytes_transferred)
460{
461 // The 'performance marker' format is largely derived from how GridFTP works
462 // (e.g., the concept of `Stripe` is not quite so relevant here). See:
463 // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
464 // Example marker:
465 // Perf Marker\n
466 // Timestamp: 1537788010\n
467 // Stripe Index: 0\n
468 // Stripe Bytes Transferred: 238745\n
469 // Total Stripe Count: 1\n
470 // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
471 // End\n
472 //
473 std::stringstream ss;
474 const std::string crlf = "\n";
475 ss << "Perf Marker" << crlf;
476 ss << "Timestamp: " << time(NULL) << crlf;
477 ss << "Stripe Index: 0" << crlf;
478 ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
479 ss << "Total Stripe Count: 1" << crlf;
480 // Build a list of TCP connections associated with this transfer; used by
481 // the TPC client for monitoring purposes.
482 bool first = true;
483 std::stringstream ss2;
484 for (std::vector<State*>::const_iterator iter = state.begin();
485 iter != state.end(); iter++)
486 {
487 std::string desc = (*iter)->GetConnectionDescription();
488 if (!desc.empty()) {
489 ss2 << (first ? "" : ",") << desc;
490 first = false;
491 }
492 }
493 if (!first)
494 ss << "RemoteConnections: " << ss2.str() << crlf;
495 ss << "End" << crlf;
496 rec.bytes_transferred = bytes_transferred;
497 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
498
499 return req.ChunkResp(ss.str().c_str(), 0);
500}
501
502/******************************************************************************/
503/* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
504/******************************************************************************/
505
506int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, TPCLogRecord &rec) {
507
508 std::string request_label = TPCRequestManager::TPCRequest::GenerateIdentifier("tpc", req.GetSecEntity().vorg, req.mSciTag);
509 TPCRequestManager::TPCRequest request(request_label, req.mSciTag, curl);
510
511 if (!m_request_manager.Produce(request)) {
512 int retval = req.StartChunkedResp(429, "Too Many Requests",
513 "Unable to accept HTTP-TPC requests "
514 "because server is too busy. Try again later");
515 if (retval) {
516 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", "Failed to send the initial response to the TPC client");
517 return retval;
518 }
519 return -1;
520 }
521
522 // curl_multi_perform is independently called in the worker thread
523 // we can however initiate a cancel here
524 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
525 if (retval) {
526 request.Cancel();
527 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
528 "Failed to send the initial response to the TPC client");
529 } else {
530 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
531 "Initial transfer response sent to the TPC client");
532 }
533
534 // Track how long it's been since the last time we recorded more bytes being transferred.
535 off_t last_advance_bytes = 0;
536 time_t last_advance_time = time(NULL);
537 time_t transfer_start = last_advance_time;
538 CURLcode res = static_cast<CURLcode>(-1);
539
540 // The transfer will start after this point, notify the packet marking
541 // manager
542
543 while ((res = (CURLcode)request.WaitFor(std::chrono::seconds(m_marker_period))) < 0) {
544 auto now = time(NULL);
545 std::string conn_desc = request.GetRemoteConnDesc();
546 off_t bytes_xfer = state.BytesTransferred();
547 if (bytes_xfer > last_advance_bytes) {
548 last_advance_bytes = bytes_xfer;
549 last_advance_time = now;
550 }
551 if (SendPerfMarker(req, rec, state, conn_desc)) {
552 request.Cancel();
553 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL", "Failed to send a perf marker to the TPC client");
554 }
555 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
556 if (now > last_advance_time + timeout) {
557 const char *log_prefix = rec.log_prefix.c_str();
558 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
559 request.Cancel();
560 state.SetErrorCode(10);
561 std::stringstream ss;
562 ss << "Transfer failed because no bytes have been "
563 << (tpc_pull ? "received from the source (pull mode) in " : "transmitted to the destination (push mode) in ") << timeout
564 << " seconds.";
565 state.SetErrorMessage(ss.str());
566 }
567 }
568
569 state.Flush();
570
571 rec.bytes_transferred = state.BytesTransferred();
572 rec.tpc_status = state.GetStatusCode();
573
574 // Explicitly finalize the stream (which will close the underlying file
575 // handle) before the response is sent. In some cases, subsequent HTTP
576 // requests can occur before the filesystem is done closing the handle -
577 // and those requests may occur against partial data.
578 state.Finalize();
579
580 // Generate the final response back to the client.
581 std::stringstream ss;
582 bool success = false;
583 if (state.GetStatusCode() >= 400) {
584 std::string err = state.GetErrorMessage();
585 std::stringstream ss2;
586 ss2 << "Remote side failed with status code " << state.GetStatusCode();
587 if (!err.empty()) {
588 std::replace(err.begin(), err.end(), '\n', ' ');
589 ss2 << "; error message: \"" << err << "\"";
590 }
591 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
592 ss << generateClientErr(ss2, rec);
593 } else if (state.GetErrorCode()) {
594 std::string err = state.GetErrorMessage();
595 if (err.empty()) {err = "(no error message provided)";}
596 else {std::replace(err.begin(), err.end(), '\n', ' ');}
597 std::stringstream ss2;
598 ss2 << "Error when interacting with local filesystem: " << err;
599 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
600 ss << generateClientErr(ss2, rec);
601 } else if (res != CURLE_OK) {
602 std::stringstream ss2;
603 ss2 << "Internal transfer failure";
604 std::stringstream ss3;
605 ss3 << ss2.str() << ": " << curl_easy_strerror(res);
606 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
607 ss << generateClientErr(ss2, rec, res);
608 } else {
609 ss << "success: Created";
610 success = true;
611 }
612
613 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
614 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
615 "Failed to send last update to remote client");
616 return retval;
617 } else if (success) {
618 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
619 rec.status = 0;
620 }
621 return req.ChunkResp(NULL, 0);
622}
623
624/******************************************************************************/
625/* T P C H a n d l e r : : P r o c e s s P u s h R e q */
626/******************************************************************************/
627
628int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
629 TPCLogRecord rec(req, TpcType::Push);
630 rec.log_prefix = "PushRequest";
631 rec.local = req.resource;
632 rec.remote = resource;
633 rec.m_log = &m_log;
634 char *name = req.GetSecEntity().name;
635 req.GetClientID(rec.clID);
636 if (name) rec.name = name;
637 logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
638
639 ManagedCurlHandle curlPtr(curl_easy_init());
640 auto curl = curlPtr.get();
641 if (!curl) {
642 std::stringstream ss;
643 ss << "Failed to initialize internal transfer resources";
644 rec.status = 500;
645 logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
646 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
647 }
648 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
649 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
650 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
651 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
652 std::string redirect_resource = req.resource;
653 if (query_header != req.headers.end()) {
654 redirect_resource = query_header->second;
655 }
656
657 AtomicBeg(m_monid_mutex);
658 uint64_t file_monid = AtomicInc(m_monid);
659 AtomicEnd(m_monid_mutex);
660 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
661 if (!fh.get()) {
662 rec.status = 500;
663 std::stringstream ss;
664 ss << "Failed to initialize internal transfer file handle";
665 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
666 ss.str());
667 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
668 }
669 std::string full_url = prepareURL(req);
670
671 std::string authz = GetAuthz(req);
672
673 int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
674 req.GetSecEntity(), authz);
675 if (SFS_REDIRECT == open_results) {
676 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
677 return result;
678 } else if (SFS_OK != open_results) {
679 int code;
680 std::stringstream ss;
681 const char *msg = fh->error.getErrText(code);
682 if (msg == NULL) ss << "Failed to open local resource";
683 else ss << msg;
684 rec.status = mapErrNoToHttp(code);
685 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
686 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
687 fh->close();
688 return resp_result;
689 }
690 ConfigureCurlCA(curl);
691 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
692
693 Stream stream(std::move(fh), 0, 0, m_log);
694 State state(0, stream, curl, true, req.tpcForwardCreds);
695 state.SetupHeaders(req);
696
697 return RunCurlWithUpdates(curl, req, state, rec);
698}
699
700/******************************************************************************/
701/* T P C H a n d l e r : : P r o c e s s P u l l R e q */
702/******************************************************************************/
703
704int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
705 TPCLogRecord rec(req,TpcType::Pull);
706 rec.log_prefix = "PullRequest";
707 rec.local = req.resource;
708 rec.remote = resource;
709 rec.m_log = &m_log;
710 char *name = req.GetSecEntity().name;
711 req.GetClientID(rec.clID);
712 if (name) rec.name = name;
713 logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
714
715 ManagedCurlHandle curlPtr(curl_easy_init());
716 auto curl = curlPtr.get();
717 if (!curl) {
718 std::stringstream ss;
719 ss << "Failed to initialize internal transfer resources";
720 rec.status = 500;
721 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
722 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
723 }
724 // ddavila 2023-01-05:
725 // The following change was required by the Rucio/SENSE project where
726 // multiple IP addresses, each from a different subnet, are assigned to a
727 // single server and routed differently by SENSE.
728 // The above requires the server to utilize the same IP, that was used to
729 // start the TPC, for the resolution of the given TPC instead of
730 // using any of the IPs available.
731 if (m_fixed_route){
732 XrdNetAddr *nP;
733 int numIP = 0;
734 char buff[1024];
735 char * ip;
736
737 // Get the hostname used to contact the server from the http header
738 auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
739 std::string host_used;
740 if (host_header != req.headers.end()) {
741 host_used = host_header->second;
742 }
743
744 // Get the IP addresses associated with the above hostname
745 XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
746 int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
747 ip = (char *)malloc(ip_size-1);
748
749 // Substring to get only the address, remove brackets and garbage
750 memcpy(ip, buff+1, ip_size-2);
751 ip[ip_size-2]='\0';
752 logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
753
754 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
755 }
756 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
757 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
758 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
759 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
760 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
761 if (!fh.get()) {
762 std::stringstream ss;
763 ss << "Failed to initialize internal transfer file handle";
764 rec.status = 500;
765 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
766 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
767 }
768 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
769 std::string redirect_resource = req.resource;
770 if (query_header != req.headers.end()) {
771 redirect_resource = query_header->second;
772 }
774 auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
775 if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
776 if (! usingEC) mode = SFS_O_TRUNC;
777 }
778 int streams = 1;
779 {
780 auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
781 if (streams_header != req.headers.end()) {
782 int stream_req = -1;
783 try {
784 stream_req = std::stol(streams_header->second);
785 } catch (...) { // Handled below
786 }
787 if (stream_req < 0 || stream_req > 100) {
788 std::stringstream ss;
789 ss << "Invalid request for number of streams";
790 rec.status = 400;
791 logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
792 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
793 }
794 streams = stream_req == 0 ? 1 : stream_req;
795 }
796 }
797 rec.streams = streams;
798 std::string full_url = prepareURL(req);
799 std::string authz = GetAuthz(req);
800 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
801 ConfigureCurlCA(curl);
802 uint64_t sourceFileContentLength = 0;
803 {
804 //Get the content-length of the source file and pass it to the OSS layer
805 //during the open
806 bool success;
807 GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
808 if(success) {
809 //In the case we cannot get the information from the source server (offline or other error)
810 //we just don't add the size information to the opaque of the local file to open
811 full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
812 } else {
813 // In the case the GetContentLength is not successful, an error will be returned to the client
814 // just exit here so we don't open the file!
815 return 0;
816 }
817 }
818 int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
819 0644 | SFS_O_MKPTH,
820 req.GetSecEntity(), authz);
821 if (SFS_REDIRECT == open_result) {
822 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
823 return result;
824 } else if (SFS_OK != open_result) {
825 int code;
826 std::stringstream ss;
827 const char *msg = fh->error.getErrText(code);
828 if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
829 else ss << msg;
830 rec.status = mapErrNoToHttp(code);
831 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
832 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
833 generateClientErr(ss, rec).c_str(), 0);
834 fh->close();
835 return resp_result;
836 }
837 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
838 State state(0, stream, curl, false, req.tpcForwardCreds);
839 state.SetupHeaders(req);
840 state.SetContentLength(sourceFileContentLength);
841
842 if (streams > 1) {
843 return RunCurlWithStreams(req, state, streams, rec);
844 } else {
845 return RunCurlWithUpdates(curl, req, state, rec);
846 }
847}
848
849/******************************************************************************/
850/* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
851/******************************************************************************/
852
853void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
854 const std::string &event, const std::string &message)
855{
856 if (!(m_log.getMsgMask() & mask)) {return;}
857
858 std::stringstream ss;
859 ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
860 if (rec.name.empty())
861 ss << ", user=(anonymous)";
862 else
863 ss << ", user=" << rec.name;
864 if (rec.streams != 1)
865 ss << ", streams=" << rec.streams;
866 if (rec.bytes_transferred >= 0)
867 ss << ", bytes_transferred=" << rec.bytes_transferred;
868 if (rec.status >= 0)
869 ss << ", status=" << rec.status;
870 if (rec.tpc_status >= 0)
871 ss << ", tpc_status=" << rec.tpc_status;
872 if (!message.empty())
873 ss << "; " << message;
874 m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
875}
876
877std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
878 std::stringstream ssret;
879 ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
880 if(cCode != CURLcode::CURLE_OK) {
881 ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
882 }
883 return ssret.str();
884}
885/******************************************************************************/
886/* X r d H t t p G e t E x t H a n d l e r */
887/******************************************************************************/
888
889extern "C" {
890
891XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
892 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
893 log->Emsg("TPCInitialize", "libcurl failed to initialize");
894 return NULL;
895 }
896
897 TPCHandler *retval{NULL};
898 if (!config) {
899 log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
900 return NULL;
901 }
902 try {
903 log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
904 retval = new TPCHandler(log, config, myEnv);
905 } catch (std::runtime_error &re) {
906 log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
907 //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
908 }
909 return retval;
910}
911
912}
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
void CURL
static std::string PrepareURL(const std::string &input)
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
int mapErrNoToHttp(int errNo)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
if(Avsz)
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
std::string clienthost
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
char * vorg
Entity's virtual organization(s)
char * name
Entity's name.
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info