XRootD
XrdHttpTpcPool.cc
Go to the documentation of this file.
1 #include "XrdHttpTpcPool.hh"
2 
3 #include <fcntl.h>
4 
5 #include <XrdOuc/XrdOucEnv.hh>
6 #include <XrdSys/XrdSysError.hh>
7 #include <XrdSys/XrdSysFD.hh>
8 #include <algorithm>
9 #include <sstream>
10 #include <string>
11 #include <thread>
12 
13 #include "XrdHttpTpcTPC.hh"
14 
15 using namespace TPC;
16 
17 decltype(TPCRequestManager::m_pool_map) TPCRequestManager::m_pool_map;
18 decltype(TPCRequestManager::m_init_once) TPCRequestManager::m_init_once;
19 decltype(TPCRequestManager::m_mutex) TPCRequestManager::m_mutex;
20 decltype(TPCRequestManager::m_idle_timeout) TPCRequestManager::m_idle_timeout = std::chrono::minutes(1);
21 unsigned TPCRequestManager::m_max_pending_ops = 20; // default max_pending_transfers_per_queue
22 unsigned TPCRequestManager::m_max_workers = 50; // default mac_active_transfers_per_queue
23 
24 TPCRequestManager::TPCQueue::TPCWorker::TPCWorker(const std::string &label, int scitag, TPCQueue &queue)
25  : m_label(label),
26  m_queue(queue),
27  m_pmark_handle((XrdNetPMark *)queue.m_parent.m_xrdEnv.GetPtr("XrdNetPMark*")),
28  m_pmark_manager(m_pmark_handle, scitag, TPC::TpcType::Pull) {}
29 
30 void TPCRequestManager::TPCQueue::TPCWorker::RunStatic(TPCWorker *myself) { myself->Run(); }
31 
32 bool TPCRequestManager::TPCQueue::TPCWorker::RunCurl(CURLM *multi_handle, TPCRequestManager::TPCRequest &request) {
33  CURLMcode mres;
34  auto curl = request.GetHandle();
35 
36  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
37  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, this);
38  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
39  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, this);
40  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
41  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, this);
42 
43  mres = curl_multi_add_handle(multi_handle, curl);
44  if (mres) {
45  std::stringstream ss;
46  ss << "Failed to add transfer to libcurl multi-handle: HTTP library "
47  "failure="
48  << curl_multi_strerror(mres);
49  m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker", ss.str().c_str());
50  request.SetDone(500, ss.str());
51  return false;
52  }
53  request.SetActive();
54 
55  CURLcode res = static_cast<CURLcode>(-1);
56  int running_handles = 1;
57  const int update_interval{1};
58  time_t now = time(NULL);
59  time_t last_update = now - update_interval; // Inorder to always fetch on first pass
60 
61  auto fail_and_exit = [&](int code, const std::string &msg) -> bool {
62  curl_multi_remove_handle(multi_handle, curl);
63  m_queue.m_parent.m_log.Log(code >= 500 ? LogMask::Error : LogMask::Info, "TPCWorker", msg.c_str());
64  request.SetDone(code, msg);
65  return false;
66  };
67 
68  do {
69  mres = curl_multi_perform(multi_handle, &running_handles);
70  if (mres != CURLM_OK) {
71  return fail_and_exit(500, "Internal curl multi-handle error: " + std::string(curl_multi_strerror(mres)));
72  }
73 
74  now = time(NULL);
75  if (now - last_update >= update_interval) {
76  request.UpdateRemoteConnDesc();
77  last_update = now;
78  }
79 
80  CURLMsg *msg;
81  do {
82  int msgq = 0;
83  msg = curl_multi_info_read(multi_handle, &msgq);
84  if (msg && (msg->msg == CURLMSG_DONE)) {
85  res = msg->data.result;
86  break;
87  }
88  } while (msg);
89 
90  mres = curl_multi_wait(multi_handle, NULL, 0, 1000, nullptr);
91  if (mres != CURLM_OK) {
92  return fail_and_exit(500, "Error during curl_multi_wait: " + std::string(curl_multi_strerror(mres)));
93  }
94 
95  if (!request.IsActive()) {
96  return fail_and_exit(499, "Transfer cancelled");
97  }
98 
99  } while (running_handles);
100 
101  request.UpdateRemoteConnDesc();
102 
103  if (res == static_cast<CURLcode>(-1)) {
104  return fail_and_exit(500, "Internal state error in libcurl - no transfer results returned");
105  }
106 
107  curl_multi_remove_handle(multi_handle, curl);
108  request.SetDone(res, "Transfer complete");
109 
110  return true;
111 }
112 
113 void TPCRequestManager::TPCQueue::TPCWorker::Run() {
114  m_queue.m_parent.m_log.Log(LogMask::Info, "TPCWorker", "Worker for", m_queue.m_identifier.c_str(), "starting");
115 
116  // Create the multi-handle and add in the current transfer to it.
117  CURLM *multi_handle = curl_multi_init();
118  if (!multi_handle) {
119  m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker",
120  "Unable to create"
121  " a libcurl multi-handle; fatal error for worker");
122  m_queue.Done(this);
123  return;
124  }
125 
126  while (true) {
127  auto request = m_queue.TryConsume();
128  if (!request) {
129  request = m_queue.ConsumeUntil(m_idle_timeout, this);
130  if (!request) {
131  m_queue.m_parent.m_log.Log(LogMask::Info, "TPCWorker", "Worker for", m_queue.m_identifier.c_str(), "exiting");
132  break;
133  }
134  }
135  if (!RunCurl(multi_handle, *request)) {
136  m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker",
137  "Worker's multi-handle"
138  " caused an internal error. Worker immediately exiting");
139  break;
140  }
141  }
142  curl_multi_cleanup(multi_handle);
143  m_queue.Done(this);
144 }
145 
146 /******************************************************************************/
147 /* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
148 /******************************************************************************/
149 
159 int TPCRequestManager::TPCQueue::TPCWorker::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
160  TPCWorker *tpcWorker = (TPCWorker *)clientp;
161 
162  if (purpose == CURLSOCKTYPE_IPCXN && tpcWorker && tpcWorker->m_pmark_manager.isEnabled()) {
163  // We will not reach this callback if the corresponding socket could not
164  // have been connected the socket is already connected only if the
165  // packet marking is enabled
166  return CURL_SOCKOPT_ALREADY_CONNECTED;
167  }
168  return CURL_SOCKOPT_OK;
169 }
170 
171 /******************************************************************************/
172 /* o p e n s o c k e t _ c a l l b a c k */
173 /******************************************************************************/
179 int TPCRequestManager::TPCQueue::TPCWorker::opensocket_callback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address) {
180  // Return a socket file descriptor (note the clo_exec flag will be set).
181  int fd = XrdSysFD_Socket(address->family, address->socktype, address->protocol);
182  // See what kind of address will be used to connect
183  if (fd < 0) {
184  return CURL_SOCKET_BAD;
185  }
186  TPCWorker *tpcWorker = (TPCWorker *)clientp;
187 
188  if (purpose == CURLSOCKTYPE_IPCXN && clientp) {
189  XrdNetAddr thePeer(&(address->addr));
190  // rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6)
191  // && !thePeer.isMapped());
192  std::stringstream connectErrMsg;
193 
194  if (!tpcWorker->m_pmark_manager.connect(fd, &(address->addr), address->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
195  tpcWorker->m_queue.m_parent.m_log.Emsg("TPCWorker:", "Unable to connect socket:", connectErrMsg.str().c_str());
196  return CURL_SOCKET_BAD;
197  }
198 
199  tpcWorker->m_pmark_manager.startTransfer();
200  tpcWorker->m_pmark_manager.beginPMarks();
201  }
202  return fd;
203 }
204 
205 /******************************************************************************/
206 /* c l o s e s o c k e t _ c a l l b a c k */
207 /******************************************************************************/
214 int TPCRequestManager::TPCQueue::TPCWorker::closesocket_callback(void *clientp, curl_socket_t fd) {
215  TPCWorker *tpcWorker = (TPCWorker *)clientp;
216 
217  tpcWorker->m_pmark_manager.endPmark(fd);
218  return close(fd);
219 }
220 
221 void TPCRequestManager::TPCQueue::Done(TPCWorker *worker) {
222  std::unique_lock<std::mutex> lock(m_mutex);
223  auto it = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<TPCWorker> &other) { return other.get() == worker; });
224  m_workers.erase(it, m_workers.end());
225 
226  if (m_workers.empty()) {
227  m_done = true;
228  lock.unlock();
229  m_parent.Done(m_identifier);
230  }
231 }
232 
233 void TPCRequestManager::Done(const std::string &ident) {
234  m_log.Log(LogMask::Info, "TPCRequestManager", "Worker pool", ident.c_str(), "is idle and all workers have exited.");
235  std::unique_lock<std::shared_mutex> lock(m_mutex);
236 
237  auto iter = m_pool_map.find(ident);
238  if (iter != m_pool_map.end()) {
239  m_pool_map.erase(iter);
240  }
241 }
242 
243 // Produce a request for processing. If the queue is full, the request will
244 // be rejected and false will be returned.
245 //
246 // Implementation notes:
247 // - If a worker is idle, it will be woken up to process the request.
248 // - If no workers are idle, a new worker will be created to process the
249 // request.
250 // - If the maximum number of workers is reached, the request will be queued
251 // until a worker is available.
252 // - If the maximum number of pending operations is reached, the request will
253 // be rejected.
254 // - If there are multiple idle workers, the oldest worker will be woken. This
255 // causes the newest workers to be idle for as long as possible and
256 // potentially exit due to lack of work. This is done to reduce the number of
257 // "mostly idle" workers in the thread pool.
258 bool TPCRequestManager::TPCQueue::Produce(TPCRequest &handler) {
259  std::unique_lock<std::mutex> lk(m_mutex);
260  if (m_ops.size() == m_max_pending_ops) {
261  m_parent.m_log.Log(LogMask::Warning, "TPCQueue", "Queue is full; rejecting request");
262  return false;
263  }
264 
265  m_ops.push_back(&handler);
266  for (auto &worker : m_workers) {
267  if (worker->IsIdle()) {
268  worker->m_cv.notify_one();
269  return true;
270  }
271  }
272 
273  if (m_workers.size() < m_max_workers) {
274  auto worker = std::make_unique<TPCRequestManager::TPCQueue::TPCWorker>(handler.GetLabel(), handler.GetScitag(), *this);
275  std::thread t(TPCRequestManager::TPCQueue::TPCWorker::RunStatic, worker.get());
276  t.detach();
277  m_workers.push_back(std::move(worker));
278  }
279  lk.unlock();
280 
281  return true;
282 }
283 
284 TPCRequestManager::TPCRequest *TPCRequestManager::TPCQueue::TryConsume() {
285  std::unique_lock<std::mutex> lk(m_mutex);
286  if (m_ops.size() == 0) {
287  return nullptr;
288  }
289 
290  auto result = m_ops.front();
291  m_ops.pop_front();
292 
293  return result;
294 }
295 
296 // Wait for a request to be available for processing, or until the duration
297 // has elapsed.
298 //
299 // Returns the request that is available, or nullptr if the duration has
300 // elapsed.
301 TPCRequestManager::TPCRequest *TPCRequestManager::TPCQueue::ConsumeUntil(std::chrono::steady_clock::duration dur, TPCWorker *worker) {
302  std::unique_lock<std::mutex> lk(m_mutex);
303  worker->SetIdle(true);
304  worker->m_cv.wait_for(lk, dur, [&] { return m_ops.size() > 0; });
305  worker->SetIdle(false);
306  if (m_ops.size() == 0) {
307  return nullptr;
308  }
309 
310  auto result = m_ops.front();
311  m_ops.pop_front();
312 
313  return result;
314 }
315 
316 void TPCRequestManager::TPCRequest::SetActive() { m_active.store(true, std::memory_order_relaxed); }
317 
318 void TPCRequestManager::TPCRequest::Cancel() { m_active.store(false, std::memory_order_relaxed); }
319 
321 
322 int TPCRequestManager::TPCRequest::GetScitag() const { return m_scitag; }
323 
324 bool TPCRequestManager::TPCRequest::IsActive() const { return m_active.load(std::memory_order_relaxed); }
325 
326 std::string TPCRequestManager::TPCRequest::GetLabel() const { return m_label; }
327 
328 std::string TPCRequestManager::TPCRequest::GenerateIdentifier(const std::string& label, const char *vorg, const int scitag) {
329  std::stringstream ss;
330  ss << label; // always present
331 
332  if (vorg && *vorg) {
333  ss << "_" << vorg;
334  }
335 
336  if (scitag != -1) {
337  ss << "_" << scitag;
338  }
339  return ss.str();
340 }
341 
342 // Logic from State::GetConnectionDescription
344 #if LIBCURL_VERSION_NUM >= 0x071500
345  // Retrieve IP address and port from the curl handle
346  const char *curl_ip = nullptr;
347  CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
348  if (rc != CURLE_OK || !curl_ip) {
349  return; // Failed to get IP, cannot update connection descriptor
350  }
351 
352  long curl_port = 0;
353  rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
354  if (rc != CURLE_OK || curl_port == 0) {
355  return; // Failed to get port, cannot update connection descriptor
356  }
357 
358  // Format the connection string according to HTTP-TPC spec
359  // IPv6 addresses must be enclosed in square brackets
360  std::stringstream ss;
361  if (strchr(curl_ip, ':') == nullptr) {
362  ss << "tcp:" << curl_ip << ":" << curl_port;
363  } else {
364  ss << "tcp:[" << curl_ip << "]:" << curl_port;
365  }
366 
367  {
368  std::unique_lock<std::mutex> lock(m_conn_mutex);
369  m_conn_list = ss.str();
370  }
371 #else
372  // For older libcurl versions, do nothing
373  return;
374 #endif
375 }
376 
378  std::unique_lock<std::mutex> lock(m_conn_mutex);
379  return m_conn_list;
380 }
381 
382 void TPCRequestManager::TPCRequest::SetDone(int status, const std::string &msg) {
383  std::unique_lock<std::mutex> lock(m_mutex);
384  m_status = status;
385  m_message = msg;
386  m_cv.notify_one();
387 }
388 
389 int TPCRequestManager::TPCRequest::WaitFor(std::chrono::steady_clock::duration dur) {
390  std::unique_lock<std::mutex> lock(m_mutex);
391  m_cv.wait_for(lock, dur, [&] { return m_status >= 0; });
392 
393  return m_status;
394 }
395 
396 TPCRequestManager::TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest) : m_log(eDest), m_xrdEnv(xrdEnv) {}
397 
398 void TPCRequestManager::SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur) { m_idle_timeout = dur; }
399 
400 // Send a request to a worker for processing. If the worker is not available,
401 // the request will be queued until a worker is available. If the queue is
402 // full, the request will be rejected and false will be returned.
404  std::shared_ptr<TPCQueue> queue;
405  // Get the queue from our per-label map. To avoid a race condition,
406  // if the queue we get has already been shut down, we release the lock
407  // and try again (with the expectation that the queue will eventually
408  // get the lock and remove itself from the map).
409  while (true) {
410  m_mutex.lock_shared();
411  std::lock_guard<std::shared_mutex> guard{m_mutex, std::adopt_lock};
412  auto iter = m_pool_map.find(handler.GetLabel());
413  if (iter != m_pool_map.end()) {
414  if (!iter->second->IsDone()) {
415  queue = iter->second;
416  break;
417  }
418  } else {
419  break;
420  }
421  }
422  if (!queue) {
423  auto created_queue = false;
424  std::string queue_name = "";
425  {
426  std::lock_guard<std::shared_mutex> guard(m_mutex);
427  auto iter = m_pool_map.find(handler.GetLabel());
428  if (iter == m_pool_map.end()) {
429  queue = std::make_shared<TPCQueue>(handler.GetLabel(), *this);
430  m_pool_map.insert(iter, {handler.GetLabel(), queue});
431  created_queue = true;
432  queue_name = handler.GetLabel();
433  } else {
434  queue = iter->second;
435  }
436  }
437  if (created_queue) {
438  m_log.Log(LogMask::Info, "RequestManager", "Created new TPC request queue for", queue_name.c_str());
439  }
440  }
441 
442  return queue->Produce(handler);
443 }
static XrdSysError eDest(0,"crypto_")
void CURL
#define close(a)
Definition: XrdPosix.hh:48
@ Error
void SetDone(int status, const std::string &msg)
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
int WaitFor(std::chrono::steady_clock::duration)
TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest)
bool Produce(TPCRequest &handler)
void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
@ Warning