XRootD
XrdHttpTpcMultistream.cc
Go to the documentation of this file.
1 
5 #include "XrdHttpTpcTPC.hh"
6 #include "XrdHttpTpcState.hh"
7 
8 #include "XrdSys/XrdSysError.hh"
9 
10 #include <curl/curl.h>
11 
12 #include <algorithm>
13 #include <sstream>
14 #include <stdexcept>
15 
16 
17 using namespace TPC;
18 
19 class CurlHandlerSetupError : public std::runtime_error {
20 public:
21  CurlHandlerSetupError(const std::string &msg) :
22  std::runtime_error(msg)
23  {}
24 
25  virtual ~CurlHandlerSetupError() noexcept {}
26 };
27 
28 namespace {
29 class MultiCurlHandler {
30 public:
31  MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
32  m_handle(curl_multi_init()),
33  m_states(states),
34  m_log(log),
35  m_bytes_transferred(0),
36  m_error_code(0),
37  m_status_code(0)
38  {
39  if (m_handle == NULL) {
40  throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
41  }
42  m_avail_handles.reserve(states.size());
43  m_active_handles.reserve(states.size());
44  for (std::vector<State*>::const_iterator state_iter = states.begin();
45  state_iter != states.end();
46  state_iter++) {
47  m_avail_handles.push_back((*state_iter)->GetHandle());
48  }
49  }
50 
51  ~MultiCurlHandler()
52  {
53  if (!m_handle) {return;}
54  for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
55  it != m_active_handles.end();
56  it++) {
57  curl_multi_remove_handle(m_handle, *it);
58  }
59  curl_multi_cleanup(m_handle);
60  }
61 
62  MultiCurlHandler(const MultiCurlHandler &) = delete;
63 
64  CURLM *Get() const {return m_handle;}
65 
66  void FinishCurlXfer(CURL *curl) {
67  CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
68  if (mres) {
69  std::stringstream ss;
70  ss << "Failed to remove transfer from set: "
71  << curl_multi_strerror(mres);
72  throw std::runtime_error(ss.str());
73  }
74  for (std::vector<State*>::iterator state_iter = m_states.begin();
75  state_iter != m_states.end();
76  state_iter++) {
77  if (curl == (*state_iter)->GetHandle()) {
78  m_bytes_transferred += (*state_iter)->BytesTransferred();
79  int error_code = (*state_iter)->GetErrorCode();
80  if (error_code && !m_error_code) {
81  m_error_code = error_code;
82  m_error_message = (*state_iter)->GetErrorMessage();
83  }
84  int status_code = (*state_iter)->GetStatusCode();
85  if (status_code >= 400 && !m_status_code) {
86  m_status_code = status_code;
87  m_error_message = (*state_iter)->GetErrorMessage();
88  }
89  (*state_iter)->ResetAfterRequest();
90  break;
91  }
92  }
93  for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
94  iter != m_active_handles.end();
95  ++iter)
96  {
97  if (*iter == curl) {
98  m_active_handles.erase(iter);
99  break;
100  }
101  }
102  m_avail_handles.push_back(curl);
103  }
104 
105  off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
106  int &running_handles) {
107  bool started_new_xfer = false;
108  do {
109  size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
110  if (xfer_size == 0) {return current_offset;}
111  if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
112  // In this case, we need to start new transfers but weren't able to.
113  if (running_handles == 0) {
114  if (!CanStartTransfer(true)) {
115  m_log.Emsg("StartTransfers", "Unable to start transfers.");
116  }
117  }
118  break;
119  } else {
120  running_handles += 1;
121  }
122  current_offset += xfer_size;
123  } while (true);
124  return current_offset;
125  }
126 
127  int Flush() {
128  int last_error = 0;
129  for (std::vector<State*>::iterator state_it = m_states.begin();
130  state_it != m_states.end();
131  state_it++)
132  {
133  int error = (*state_it)->Flush();
134  if (error) {last_error = error;}
135  }
136  return last_error;
137  }
138 
139  off_t BytesTransferred() const {
140  return m_bytes_transferred;
141  }
142 
143  int GetStatusCode() const {
144  return m_status_code;
145  }
146 
147  int GetErrorCode() const {
148  return m_error_code;
149  }
150 
151  void SetErrorCode(int error_code) {
152  m_error_code = error_code;
153  }
154 
155  std::string GetErrorMessage() const {
156  return m_error_message;
157  }
158 
159  void SetErrorMessage(const std::string &error_msg) {
160  m_error_message = error_msg;
161  }
162 
163 private:
164 
165  bool StartTransfer(off_t offset, size_t size) {
166  if (!CanStartTransfer(false)) {return false;}
167  for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
168  handle_it != m_avail_handles.end();
169  handle_it++) {
170  for (std::vector<State*>::iterator state_it = m_states.begin();
171  state_it != m_states.end();
172  state_it++) {
173  if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
174  (*state_it)->SetTransferParameters(offset, size);
175  ActivateHandle(**state_it);
176  return true;
177  }
178  }
179  }
180  return false;
181  }
182 
183  void ActivateHandle(State &state) {
184  CURL *curl = state.GetHandle();
185  m_active_handles.push_back(curl);
186  CURLMcode mres;
187  mres = curl_multi_add_handle(m_handle, curl);
188  if (mres) {
189  std::stringstream ss;
190  ss << "Failed to add transfer to libcurl multi-handle"
191  << curl_multi_strerror(mres);
192  throw std::runtime_error(ss.str());
193  }
194  for (auto iter = m_avail_handles.begin();
195  iter != m_avail_handles.end();
196  ++iter)
197  {
198  if (*iter == curl) {
199  m_avail_handles.erase(iter);
200  break;
201  }
202  }
203  }
204 
205  bool CanStartTransfer(bool log_reason) const {
206  size_t idle_handles = m_avail_handles.size();
207  size_t transfer_in_progress = 0;
208  for (std::vector<State*>::const_iterator state_iter = m_states.begin();
209  state_iter != m_states.end();
210  state_iter++) {
211  for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
212  handle_iter != m_active_handles.end();
213  handle_iter++) {
214  if (*handle_iter == (*state_iter)->GetHandle()) {
215  transfer_in_progress += (*state_iter)->BodyTransferInProgress();
216  break;
217  }
218  }
219  }
220  if (!idle_handles) {
221  if (log_reason) {
222  m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
223  }
224  return false;
225  }
226  ssize_t available_buffers = m_states[0]->AvailableBuffers();
227  // To be conservative, set aside buffers for any transfers that have been activated
228  // but don't have their first responses back yet.
229  available_buffers -= (m_active_handles.size() - transfer_in_progress);
230  if (log_reason && (available_buffers == 0)) {
231  std::stringstream ss;
232  ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
233  m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
234  << ", Transfers in progress: " << transfer_in_progress;
235  m_log.Emsg("CanStartTransfer", ss.str().c_str());
236  if (m_states[0]->AvailableBuffers() == 0) {
237  m_states[0]->DumpBuffers();
238  }
239  }
240  return available_buffers > 0;
241  }
242 
243  CURLM *m_handle;
244  std::vector<CURL *> m_avail_handles;
245  std::vector<CURL *> m_active_handles;
246  std::vector<State*> &m_states;
247  XrdSysError &m_log;
248  off_t m_bytes_transferred;
249  int m_error_code;
250  int m_status_code;
251  std::string m_error_message;
252 };
253 }
254 
255 
256 int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
257  size_t streams, std::vector<State*> &handles,
258  std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
259 {
260  bool success;
261  // The content-length was set thanks to the call to GetContentLengthTPCPull() before calling this function
262  off_t content_size = state.GetContentLength();
263  off_t current_offset = 0;
264 
265  size_t concurrency = streams * m_pipelining_multiplier;
266 
267  handles.reserve(concurrency);
268  handles.push_back(new State());
269  handles[0]->Move(state);
270  for (size_t idx = 1; idx < concurrency; idx++) {
271  handles.push_back(handles[0]->Duplicate());
272  curl_handles.emplace_back(handles.back()->GetHandle());
273  }
274 
275  // Notify the packet marking manager that the transfer will start after this point
276 
277  // Create the multi-handle and add in the current transfer to it.
278  MultiCurlHandler mch(handles, m_log);
279  CURLM *multi_handle = mch.Get();
280 
281  curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
282  curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
283 
284  // Start response to client prior to the first call to curl_multi_perform
285  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
286  if (retval) {
287  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
288  "Failed to send the initial response to the TPC client");
289  return retval;
290  } else {
291  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
292  "Initial transfer response sent to the TPC client");
293  }
294 
295  // Start assigning transfers
296  int running_handles = 0;
297  current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
298 
299  // Transfer loop: use curl to actually run the transfer, but periodically
300  // interrupt things to send back performance updates to the client.
301  time_t last_marker = 0;
302  // Track the time since the transfer last made progress
303  off_t last_advance_bytes = 0;
304  time_t last_advance_time = time(NULL);
305  time_t transfer_start = last_advance_time;
306  CURLcode res = static_cast<CURLcode>(-1);
307  CURLMcode mres = CURLM_OK;
308  do {
309  time_t now = time(NULL);
310  time_t next_marker = last_marker + m_marker_period;
311  if (now >= next_marker) {
312  if (current_offset > last_advance_bytes) {
313  last_advance_bytes = current_offset;
314  last_advance_time = now;
315  }
316  if (SendPerfMarker(req, rec, handles, current_offset)) {
317  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
318  "Failed to send a perf marker to the TPC client");
319  return -1;
320  }
321  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
322  if (now > last_advance_time + timeout) {
323  const char *log_prefix = rec.log_prefix.c_str();
324  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
325 
326  mch.SetErrorCode(10);
327  std::stringstream ss;
328  ss << "Transfer failed because no bytes have been "
329  << (tpc_pull ? "received from the source (pull mode) in "
330  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
331  mch.SetErrorMessage(ss.str());
332  break;
333  }
334  last_marker = now;
335  }
336 
337  mres = curl_multi_perform(multi_handle, &running_handles);
338  if (mres == CURLM_CALL_MULTI_PERFORM) {
339  // curl_multi_perform should be called again immediately. On newer
340  // versions of curl, this is no longer used.
341  continue;
342  } else if (mres != CURLM_OK) {
343  break;
344  }
345 
346 
347  // Harvest any messages, looking for CURLMSG_DONE.
348  CURLMsg *msg;
349  do {
350  int msgq = 0;
351  msg = curl_multi_info_read(multi_handle, &msgq);
352  if (msg && (msg->msg == CURLMSG_DONE)) {
353  CURL *easy_handle = msg->easy_handle;
354  res = msg->data.result;
355  mch.FinishCurlXfer(easy_handle);
356  // If any requests fail, cut off the entire transfer.
357  if (res != CURLE_OK) {
358  break;
359  }
360  }
361  } while (msg);
362  if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
363  std::stringstream ss;
364  ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
365  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
366  ss.str());
367  break;
368  }
369 
370  if (running_handles < static_cast<int>(concurrency)) {
371  // Issue new transfers if there is still pending work to do.
372  // Otherwise, continue running until there are no handles left.
373  if (current_offset != content_size) {
374  current_offset = mch.StartTransfers(current_offset, content_size,
375  m_block_size, running_handles);
376  if (!running_handles) {
377  std::stringstream ss;
378  ss << "No handles are able to run. Streams=" << streams << ", concurrency="
379  << concurrency;
380 
381  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
382  }
383  } else if (running_handles == 0) {
384  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
385  "Unable to start new transfers; breaking loop.");
386  break;
387  }
388  }
389 
390  int64_t max_sleep_time = next_marker - time(NULL);
391  if (max_sleep_time <= 0) {
392  continue;
393  }
394  int fd_count;
395  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
396  &fd_count);
397  if (mres != CURLM_OK) {
398  break;
399  }
400  } while (running_handles);
401 
402  if (mres != CURLM_OK) {
403  std::stringstream ss;
404  ss << "Internal libcurl multi-handle error: "
405  << curl_multi_strerror(mres);
406  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
407  throw std::runtime_error(ss.str());
408  }
409 
410  // Harvest any messages, looking for CURLMSG_DONE.
411  CURLMsg *msg;
412  do {
413  int msgq = 0;
414  msg = curl_multi_info_read(multi_handle, &msgq);
415  if (msg && (msg->msg == CURLMSG_DONE)) {
416  CURL *easy_handle = msg->easy_handle;
417  mch.FinishCurlXfer(easy_handle);
418  if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
419  res = msg->data.result; // Transfer result will be examined below.
420  }
421  } while (msg);
422 
423  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
424  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
425  "Internal state error in libcurl");
426  throw std::runtime_error("Internal state error in libcurl");
427  }
428 
429  mch.Flush();
430 
431  rec.bytes_transferred = mch.BytesTransferred();
432  rec.tpc_status = mch.GetStatusCode();
433 
434  // Generate the final response back to the client.
435  std::stringstream ss;
436  success = false;
437  if (mch.GetStatusCode() >= 400) {
438  std::string err = mch.GetErrorMessage();
439  std::stringstream ss2;
440  ss2 << "Remote side failed with status code " << mch.GetStatusCode();
441  if (!err.empty()) {
442  std::replace(err.begin(), err.end(), '\n', ' ');
443  ss2 << "; error message: \"" << err << "\"";
444  }
445  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
446  ss << generateClientErr(ss2, rec);
447  } else if (mch.GetErrorCode()) {
448  std::string err = mch.GetErrorMessage();
449  if (err.empty()) {err = "(no error message provided)";}
450  else {std::replace(err.begin(), err.end(), '\n', ' ');}
451  std::stringstream ss2;
452  ss2 << "Error when interacting with local filesystem: " << err;
453  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
454  ss << generateClientErr(ss2, rec);
455  } else if (res != CURLE_OK) {
456  std::stringstream ss2;
457  ss2 << "Request failed when processing";
458  std::stringstream ss3;
459  ss3 << ss2.str() << ":" << curl_easy_strerror(res);
460  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
461  ss << generateClientErr(ss2, rec, res);
462  } else if (current_offset != content_size) {
463  std::stringstream ss2;
464  ss2 << "Internal logic error led to early abort; current offset is " <<
465  current_offset << " while full size is " << content_size;
466  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
467  ss << generateClientErr(ss2, rec);
468  } else {
469  if (!handles[0]->Finalize()) {
470  std::stringstream ss2;
471  ss2 << "Failed to finalize and close file handle.";
472  ss << generateClientErr(ss2, rec);
473  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
474  ss2.str());
475  } else {
476  ss << "success: Created";
477  success = true;
478  }
479  }
480 
481  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
482  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
483  "Failed to send last update to remote client");
484  return retval;
485  } else if (success) {
486  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
487  rec.status = 0;
488  }
489  return req.ChunkResp(NULL, 0);
490 }
491 
492 
493 int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
494  size_t streams, TPCLogRecord &rec)
495 {
496  std::vector<ManagedCurlHandle> curl_handles;
497  std::vector<State*> handles;
498  std::stringstream err_ss;
499  try {
500  int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
501  for (std::vector<State*>::iterator state_iter = handles.begin();
502  state_iter != handles.end();
503  state_iter++) {
504  delete *state_iter;
505  }
506  return retval;
507  } catch (CurlHandlerSetupError &e) {
508  for (std::vector<State*>::iterator state_iter = handles.begin();
509  state_iter != handles.end();
510  state_iter++) {
511  delete *state_iter;
512  }
513 
514  rec.status = 500;
515  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
516  std::stringstream ss;
517  ss << e.what();
518  err_ss << generateClientErr(ss, rec);
519  return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
520  } catch (std::runtime_error &e) {
521  for (std::vector<State*>::iterator state_iter = handles.begin();
522  state_iter != handles.end();
523  state_iter++) {
524  delete *state_iter;
525  }
526 
527  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
528  std::stringstream ss;
529  ss << e.what();
530  err_ss << generateClientErr(ss, rec);
531  int retval;
532  if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
533  return retval;
534  }
535  return req.ChunkResp(NULL, 0);
536  }
537 }
void CURL
#define Duplicate(x, y)
bool Debug
@ Error
CurlHandlerSetupError(const std::string &msg)
virtual ~CurlHandlerSetupError() noexcept
CURL * GetHandle() const
int GetErrorCode() const
off_t GetContentLength() const
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.