XRootD
Loading...
Searching...
No Matches
XrdHttpTpcPool.cc
Go to the documentation of this file.
1#include "XrdHttpTpcPool.hh"
2
3#include <fcntl.h>
4
5#include <XrdOuc/XrdOucEnv.hh>
7#include <XrdSys/XrdSysFD.hh>
8#include <algorithm>
9#include <sstream>
10#include <string>
11#include <thread>
12
13#include "XrdHttpTpcTPC.hh"
14
15using namespace TPC;
16
17decltype(TPCRequestManager::m_pool_map) TPCRequestManager::m_pool_map;
18decltype(TPCRequestManager::m_init_once) TPCRequestManager::m_init_once;
19decltype(TPCRequestManager::m_mutex) TPCRequestManager::m_mutex;
20decltype(TPCRequestManager::m_idle_timeout) TPCRequestManager::m_idle_timeout = std::chrono::minutes(1);
21unsigned TPCRequestManager::m_max_pending_ops = 20; // default max_pending_transfers_per_queue
22unsigned TPCRequestManager::m_max_workers = 50; // default mac_active_transfers_per_queue
23
24TPCRequestManager::TPCQueue::TPCWorker::TPCWorker(const std::string &label, int scitag, TPCQueue &queue)
25 : m_label(label),
26 m_queue(queue),
27 m_pmark_handle((XrdNetPMark *)queue.m_parent.m_xrdEnv.GetPtr("XrdNetPMark*")),
28 m_pmark_manager(m_pmark_handle, scitag, TPC::TpcType::Pull) {}
29
30void TPCRequestManager::TPCQueue::TPCWorker::RunStatic(TPCWorker *myself) { myself->Run(); }
31
32bool TPCRequestManager::TPCQueue::TPCWorker::RunCurl(CURLM *multi_handle, TPCRequestManager::TPCRequest &request) {
33 CURLMcode mres;
34 auto curl = request.GetHandle();
35
36 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
37 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, this);
38 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
39 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, this);
40 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
41 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, this);
42
43 mres = curl_multi_add_handle(multi_handle, curl);
44 if (mres) {
45 std::stringstream ss;
46 ss << "Failed to add transfer to libcurl multi-handle: HTTP library "
47 "failure="
48 << curl_multi_strerror(mres);
49 m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker", ss.str().c_str());
50 request.SetDone(500, ss.str());
51 return false;
52 }
53 request.SetActive();
54
55 CURLcode res = static_cast<CURLcode>(-1);
56 int running_handles = 1;
57 const int update_interval{1};
58 time_t now = time(NULL);
59 time_t last_update = now - update_interval; // Inorder to always fetch on first pass
60
61 auto fail_and_exit = [&](int code, const std::string &msg) -> bool {
62 curl_multi_remove_handle(multi_handle, curl);
63 m_queue.m_parent.m_log.Log(code >= 500 ? LogMask::Error : LogMask::Info, "TPCWorker", msg.c_str());
64 request.SetDone(code, msg);
65 return false;
66 };
67
68 do {
69 mres = curl_multi_perform(multi_handle, &running_handles);
70 if (mres != CURLM_OK) {
71 return fail_and_exit(500, "Internal curl multi-handle error: " + std::string(curl_multi_strerror(mres)));
72 }
73
74 now = time(NULL);
75 if (now - last_update >= update_interval) {
76 request.UpdateRemoteConnDesc();
77 last_update = now;
78 }
79
80 CURLMsg *msg;
81 do {
82 int msgq = 0;
83 msg = curl_multi_info_read(multi_handle, &msgq);
84 if (msg && (msg->msg == CURLMSG_DONE)) {
85 res = msg->data.result;
86 break;
87 }
88 } while (msg);
89
90 mres = curl_multi_wait(multi_handle, NULL, 0, 1000, nullptr);
91 if (mres != CURLM_OK) {
92 return fail_and_exit(500, "Error during curl_multi_wait: " + std::string(curl_multi_strerror(mres)));
93 }
94
95 if (!request.IsActive()) {
96 return fail_and_exit(499, "Transfer cancelled");
97 }
98
99 } while (running_handles);
100
101 request.UpdateRemoteConnDesc();
102
103 if (res == static_cast<CURLcode>(-1)) {
104 return fail_and_exit(500, "Internal state error in libcurl - no transfer results returned");
105 }
106
107 curl_multi_remove_handle(multi_handle, curl);
108 request.SetDone(res, "Transfer complete");
109
110 return true;
111}
112
113void TPCRequestManager::TPCQueue::TPCWorker::Run() {
114 m_queue.m_parent.m_log.Log(LogMask::Info, "TPCWorker", "Worker for", m_queue.m_identifier.c_str(), "starting");
115
116 // Create the multi-handle and add in the current transfer to it.
117 CURLM *multi_handle = curl_multi_init();
118 if (!multi_handle) {
119 m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker",
120 "Unable to create"
121 " a libcurl multi-handle; fatal error for worker");
122 m_queue.Done(this);
123 return;
124 }
125
126 while (true) {
127 auto request = m_queue.TryConsume();
128 if (!request) {
129 request = m_queue.ConsumeUntil(m_idle_timeout, this);
130 if (!request) {
131 m_queue.m_parent.m_log.Log(LogMask::Info, "TPCWorker", "Worker for", m_queue.m_identifier.c_str(), "exiting");
132 break;
133 }
134 }
135 if (!RunCurl(multi_handle, *request)) {
136 m_queue.m_parent.m_log.Log(LogMask::Error, "TPCWorker",
137 "Worker's multi-handle"
138 " caused an internal error. Worker immediately exiting");
139 break;
140 }
141 }
142 curl_multi_cleanup(multi_handle);
143 m_queue.Done(this);
144}
145
146/******************************************************************************/
147/* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
148/******************************************************************************/
149
158
159int TPCRequestManager::TPCQueue::TPCWorker::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
160 TPCWorker *tpcWorker = (TPCWorker *)clientp;
161
162 if (purpose == CURLSOCKTYPE_IPCXN && tpcWorker && tpcWorker->m_pmark_manager.isEnabled()) {
163 // We will not reach this callback if the corresponding socket could not
164 // have been connected the socket is already connected only if the
165 // packet marking is enabled
166 return CURL_SOCKOPT_ALREADY_CONNECTED;
167 }
168 return CURL_SOCKOPT_OK;
169}
170
171/******************************************************************************/
172/* o p e n s o c k e t _ c a l l b a c k */
173/******************************************************************************/
178
179int TPCRequestManager::TPCQueue::TPCWorker::opensocket_callback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address) {
180 // Return a socket file descriptor (note the clo_exec flag will be set).
181 int fd = XrdSysFD_Socket(address->family, address->socktype, address->protocol);
182 // See what kind of address will be used to connect
183 if (fd < 0) {
184 return CURL_SOCKET_BAD;
185 }
186 TPCWorker *tpcWorker = (TPCWorker *)clientp;
187
188 if (purpose == CURLSOCKTYPE_IPCXN && clientp) {
189 XrdNetAddr thePeer(&(address->addr));
190 // rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6)
191 // && !thePeer.isMapped());
192 std::stringstream connectErrMsg;
193
194 if (!tpcWorker->m_pmark_manager.connect(fd, &(address->addr), address->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
195 tpcWorker->m_queue.m_parent.m_log.Emsg("TPCWorker:", "Unable to connect socket:", connectErrMsg.str().c_str());
196 return CURL_SOCKET_BAD;
197 }
198
199 tpcWorker->m_pmark_manager.startTransfer();
200 tpcWorker->m_pmark_manager.beginPMarks();
201 }
202 return fd;
203}
204
205/******************************************************************************/
206/* c l o s e s o c k e t _ c a l l b a c k */
207/******************************************************************************/
213
214int TPCRequestManager::TPCQueue::TPCWorker::closesocket_callback(void *clientp, curl_socket_t fd) {
215 TPCWorker *tpcWorker = (TPCWorker *)clientp;
216
217 tpcWorker->m_pmark_manager.endPmark(fd);
218 return close(fd);
219}
220
221void TPCRequestManager::TPCQueue::Done(TPCWorker *worker) {
222 std::unique_lock<std::mutex> lock(m_mutex);
223 auto it = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<TPCWorker> &other) { return other.get() == worker; });
224 m_workers.erase(it, m_workers.end());
225
226 if (m_workers.empty()) {
227 m_done = true;
228 lock.unlock();
229 m_parent.Done(m_identifier);
230 }
231}
232
233void TPCRequestManager::Done(const std::string &ident) {
234 m_log.Log(LogMask::Info, "TPCRequestManager", "Worker pool", ident.c_str(), "is idle and all workers have exited.");
235 std::unique_lock<std::shared_mutex> lock(m_mutex);
236
237 auto iter = m_pool_map.find(ident);
238 if (iter != m_pool_map.end()) {
239 m_pool_map.erase(iter);
240 }
241}
242
243// Produce a request for processing. If the queue is full, the request will
244// be rejected and false will be returned.
245//
246// Implementation notes:
247// - If a worker is idle, it will be woken up to process the request.
248// - If no workers are idle, a new worker will be created to process the
249// request.
250// - If the maximum number of workers is reached, the request will be queued
251// until a worker is available.
252// - If the maximum number of pending operations is reached, the request will
253// be rejected.
254// - If there are multiple idle workers, the oldest worker will be woken. This
255// causes the newest workers to be idle for as long as possible and
256// potentially exit due to lack of work. This is done to reduce the number of
257// "mostly idle" workers in the thread pool.
258bool TPCRequestManager::TPCQueue::Produce(TPCRequest &handler) {
259 std::unique_lock<std::mutex> lk(m_mutex);
260 if (m_ops.size() == m_max_pending_ops) {
261 m_parent.m_log.Log(LogMask::Warning, "TPCQueue", "Queue is full; rejecting request");
262 return false;
263 }
264
265 m_ops.push_back(&handler);
266 for (auto &worker : m_workers) {
267 if (worker->IsIdle()) {
268 worker->m_cv.notify_one();
269 return true;
270 }
271 }
272
273 if (m_workers.size() < m_max_workers) {
274 auto worker = std::make_unique<TPCRequestManager::TPCQueue::TPCWorker>(handler.GetLabel(), handler.GetScitag(), *this);
275 std::thread t(TPCRequestManager::TPCQueue::TPCWorker::RunStatic, worker.get());
276 t.detach();
277 m_workers.push_back(std::move(worker));
278 }
279 lk.unlock();
280
281 return true;
282}
283
284TPCRequestManager::TPCRequest *TPCRequestManager::TPCQueue::TryConsume() {
285 std::unique_lock<std::mutex> lk(m_mutex);
286 if (m_ops.size() == 0) {
287 return nullptr;
288 }
289
290 auto result = m_ops.front();
291 m_ops.pop_front();
292
293 return result;
294}
295
296// Wait for a request to be available for processing, or until the duration
297// has elapsed.
298//
299// Returns the request that is available, or nullptr if the duration has
300// elapsed.
301TPCRequestManager::TPCRequest *TPCRequestManager::TPCQueue::ConsumeUntil(std::chrono::steady_clock::duration dur, TPCWorker *worker) {
302 std::unique_lock<std::mutex> lk(m_mutex);
303 worker->SetIdle(true);
304 worker->m_cv.wait_for(lk, dur, [&] { return m_ops.size() > 0; });
305 worker->SetIdle(false);
306 if (m_ops.size() == 0) {
307 return nullptr;
308 }
309
310 auto result = m_ops.front();
311 m_ops.pop_front();
312
313 return result;
314}
315
316void TPCRequestManager::TPCRequest::SetActive() { m_active.store(true, std::memory_order_relaxed); }
317
318void TPCRequestManager::TPCRequest::Cancel() { m_active.store(false, std::memory_order_relaxed); }
319
321
322int TPCRequestManager::TPCRequest::GetScitag() const { return m_scitag; }
323
324bool TPCRequestManager::TPCRequest::IsActive() const { return m_active.load(std::memory_order_relaxed); }
325
326std::string TPCRequestManager::TPCRequest::GetLabel() const { return m_label; }
327
328std::string TPCRequestManager::TPCRequest::GenerateIdentifier(const std::string& label, const char *vorg, const int scitag) {
329 std::stringstream ss;
330 ss << label; // always present
331
332 if (vorg && *vorg) {
333 ss << "_" << vorg;
334 }
335
336 if (scitag != -1) {
337 ss << "_" << scitag;
338 }
339 return ss.str();
340}
341
342// Logic from State::GetConnectionDescription
344#if LIBCURL_VERSION_NUM >= 0x071500
345 // Retrieve IP address and port from the curl handle
346 const char *curl_ip = nullptr;
347 CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
348 if (rc != CURLE_OK || !curl_ip) {
349 return; // Failed to get IP, cannot update connection descriptor
350 }
351
352 long curl_port = 0;
353 rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
354 if (rc != CURLE_OK || curl_port == 0) {
355 return; // Failed to get port, cannot update connection descriptor
356 }
357
358 // Format the connection string according to HTTP-TPC spec
359 // IPv6 addresses must be enclosed in square brackets
360 std::stringstream ss;
361 if (strchr(curl_ip, ':') == nullptr) {
362 ss << "tcp:" << curl_ip << ":" << curl_port;
363 } else {
364 ss << "tcp:[" << curl_ip << "]:" << curl_port;
365 }
366
367 {
368 std::unique_lock<std::mutex> lock(m_conn_mutex);
369 m_conn_list = ss.str();
370 }
371#else
372 // For older libcurl versions, do nothing
373 return;
374#endif
375}
376
378 std::unique_lock<std::mutex> lock(m_conn_mutex);
379 return m_conn_list;
380}
381
382void TPCRequestManager::TPCRequest::SetDone(int status, const std::string &msg) {
383 std::unique_lock<std::mutex> lock(m_mutex);
384 m_status = status;
385 m_message = msg;
386 m_cv.notify_one();
387}
388
389int TPCRequestManager::TPCRequest::WaitFor(std::chrono::steady_clock::duration dur) {
390 std::unique_lock<std::mutex> lock(m_mutex);
391 m_cv.wait_for(lock, dur, [&] { return m_status >= 0; });
392
393 return m_status;
394}
395
397
398void TPCRequestManager::SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur) { m_idle_timeout = dur; }
399
400// Send a request to a worker for processing. If the worker is not available,
401// the request will be queued until a worker is available. If the queue is
402// full, the request will be rejected and false will be returned.
404 std::shared_ptr<TPCQueue> queue;
405 // Get the queue from our per-label map. To avoid a race condition,
406 // if the queue we get has already been shut down, we release the lock
407 // and try again (with the expectation that the queue will eventually
408 // get the lock and remove itself from the map).
409 while (true) {
410 m_mutex.lock_shared();
411 std::lock_guard<std::shared_mutex> guard{m_mutex, std::adopt_lock};
412 auto iter = m_pool_map.find(handler.GetLabel());
413 if (iter != m_pool_map.end()) {
414 if (!iter->second->IsDone()) {
415 queue = iter->second;
416 break;
417 }
418 } else {
419 break;
420 }
421 }
422 if (!queue) {
423 auto created_queue = false;
424 std::string queue_name = "";
425 {
426 std::lock_guard<std::shared_mutex> guard(m_mutex);
427 auto iter = m_pool_map.find(handler.GetLabel());
428 if (iter == m_pool_map.end()) {
429 queue = std::make_shared<TPCQueue>(handler.GetLabel(), *this);
430 m_pool_map.insert(iter, {handler.GetLabel(), queue});
431 created_queue = true;
432 queue_name = handler.GetLabel();
433 } else {
434 queue = iter->second;
435 }
436 }
437 if (created_queue) {
438 m_log.Log(LogMask::Info, "RequestManager", "Created new TPC request queue for", queue_name.c_str());
439 }
440 }
441
442 return queue->Produce(handler);
443}
static XrdSysError eDest(0,"crypto_")
void CURL
#define close(a)
Definition XrdPosix.hh:48
void SetDone(int status, const std::string &msg)
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
int WaitFor(std::chrono::steady_clock::duration)
TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest)
bool Produce(TPCRequest &handler)
void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur)