17 decltype(TPCRequestManager::m_pool_map) TPCRequestManager::m_pool_map;
18 decltype(TPCRequestManager::m_init_once) TPCRequestManager::m_init_once;
19 decltype(TPCRequestManager::m_mutex) TPCRequestManager::m_mutex;
20 decltype(TPCRequestManager::m_idle_timeout) TPCRequestManager::m_idle_timeout = std::chrono::minutes(1);
21 unsigned TPCRequestManager::m_max_pending_ops = 20;
22 unsigned TPCRequestManager::m_max_workers = 50;
24 TPCRequestManager::TPCQueue::TPCWorker::TPCWorker(
const std::string &label,
int scitag, TPCQueue &queue)
27 m_pmark_handle((
XrdNetPMark *)queue.m_parent.m_xrdEnv.GetPtr(
"XrdNetPMark*")),
30 void TPCRequestManager::TPCQueue::TPCWorker::RunStatic(TPCWorker *myself) { myself->Run(); }
36 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
37 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA,
this);
38 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
39 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA,
this);
40 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
41 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA,
this);
43 mres = curl_multi_add_handle(multi_handle, curl);
46 ss <<
"Failed to add transfer to libcurl multi-handle: HTTP library "
48 << curl_multi_strerror(mres);
49 m_queue.m_parent.m_log.Log(
LogMask::Error,
"TPCWorker", ss.str().c_str());
55 CURLcode res =
static_cast<CURLcode
>(-1);
56 int running_handles = 1;
57 const int update_interval{1};
58 time_t now = time(NULL);
59 time_t last_update = now - update_interval;
61 auto fail_and_exit = [&](
int code,
const std::string &msg) ->
bool {
62 curl_multi_remove_handle(multi_handle, curl);
69 mres = curl_multi_perform(multi_handle, &running_handles);
70 if (mres != CURLM_OK) {
71 return fail_and_exit(500,
"Internal curl multi-handle error: " + std::string(curl_multi_strerror(mres)));
75 if (now - last_update >= update_interval) {
83 msg = curl_multi_info_read(multi_handle, &msgq);
84 if (msg && (msg->msg == CURLMSG_DONE)) {
85 res = msg->data.result;
90 mres = curl_multi_wait(multi_handle, NULL, 0, 1000,
nullptr);
91 if (mres != CURLM_OK) {
92 return fail_and_exit(500,
"Error during curl_multi_wait: " + std::string(curl_multi_strerror(mres)));
96 return fail_and_exit(499,
"Transfer cancelled");
99 }
while (running_handles);
103 if (res ==
static_cast<CURLcode
>(-1)) {
104 return fail_and_exit(500,
"Internal state error in libcurl - no transfer results returned");
107 curl_multi_remove_handle(multi_handle, curl);
108 request.
SetDone(res,
"Transfer complete");
113 void TPCRequestManager::TPCQueue::TPCWorker::Run() {
114 m_queue.m_parent.m_log.Log(
LogMask::Info,
"TPCWorker",
"Worker for", m_queue.m_identifier.c_str(),
"starting");
117 CURLM *multi_handle = curl_multi_init();
121 " a libcurl multi-handle; fatal error for worker");
127 auto request = m_queue.TryConsume();
129 request = m_queue.ConsumeUntil(m_idle_timeout,
this);
131 m_queue.m_parent.m_log.Log(
LogMask::Info,
"TPCWorker",
"Worker for", m_queue.m_identifier.c_str(),
"exiting");
135 if (!RunCurl(multi_handle, *request)) {
137 "Worker's multi-handle"
138 " caused an internal error. Worker immediately exiting");
142 curl_multi_cleanup(multi_handle);
159 int TPCRequestManager::TPCQueue::TPCWorker::sockopt_callback(
void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
160 TPCWorker *tpcWorker = (TPCWorker *)clientp;
162 if (purpose == CURLSOCKTYPE_IPCXN && tpcWorker && tpcWorker->m_pmark_manager.isEnabled()) {
166 return CURL_SOCKOPT_ALREADY_CONNECTED;
168 return CURL_SOCKOPT_OK;
179 int TPCRequestManager::TPCQueue::TPCWorker::opensocket_callback(
void *clientp, curlsocktype purpose,
struct curl_sockaddr *address) {
181 int fd = XrdSysFD_Socket(address->family, address->socktype, address->protocol);
184 return CURL_SOCKET_BAD;
186 TPCWorker *tpcWorker = (TPCWorker *)clientp;
188 if (purpose == CURLSOCKTYPE_IPCXN && clientp) {
192 std::stringstream connectErrMsg;
194 if (!tpcWorker->m_pmark_manager.connect(fd, &(address->addr), address->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
195 tpcWorker->m_queue.m_parent.m_log.Emsg(
"TPCWorker:",
"Unable to connect socket:", connectErrMsg.str().c_str());
196 return CURL_SOCKET_BAD;
199 tpcWorker->m_pmark_manager.startTransfer();
200 tpcWorker->m_pmark_manager.beginPMarks();
214 int TPCRequestManager::TPCQueue::TPCWorker::closesocket_callback(
void *clientp, curl_socket_t fd) {
215 TPCWorker *tpcWorker = (TPCWorker *)clientp;
217 tpcWorker->m_pmark_manager.endPmark(fd);
221 void TPCRequestManager::TPCQueue::Done(TPCWorker *worker) {
222 std::unique_lock<std::mutex> lock(m_mutex);
223 auto it = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<TPCWorker> &other) { return other.get() == worker; });
224 m_workers.erase(it, m_workers.end());
226 if (m_workers.empty()) {
229 m_parent.Done(m_identifier);
233 void TPCRequestManager::Done(
const std::string &ident) {
234 m_log.
Log(
LogMask::Info,
"TPCRequestManager",
"Worker pool", ident.c_str(),
"is idle and all workers have exited.");
235 std::unique_lock<std::shared_mutex> lock(m_mutex);
237 auto iter = m_pool_map.find(ident);
238 if (iter != m_pool_map.end()) {
239 m_pool_map.erase(iter);
258 bool TPCRequestManager::TPCQueue::Produce(TPCRequest &handler) {
259 std::unique_lock<std::mutex> lk(m_mutex);
260 if (m_ops.size() == m_max_pending_ops) {
261 m_parent.m_log.Log(
LogMask::Warning,
"TPCQueue",
"Queue is full; rejecting request");
265 m_ops.push_back(&handler);
266 for (
auto &worker : m_workers) {
267 if (worker->IsIdle()) {
268 worker->m_cv.notify_one();
273 if (m_workers.size() < m_max_workers) {
274 auto worker = std::make_unique<TPCRequestManager::TPCQueue::TPCWorker>(handler.GetLabel(), handler.GetScitag(), *
this);
275 std::thread t(TPCRequestManager::TPCQueue::TPCWorker::RunStatic, worker.get());
277 m_workers.push_back(std::move(worker));
285 std::unique_lock<std::mutex> lk(m_mutex);
286 if (m_ops.size() == 0) {
290 auto result = m_ops.front();
302 std::unique_lock<std::mutex> lk(m_mutex);
303 worker->SetIdle(
true);
304 worker->m_cv.wait_for(lk, dur, [&] {
return m_ops.size() > 0; });
305 worker->SetIdle(
false);
306 if (m_ops.size() == 0) {
310 auto result = m_ops.front();
329 std::stringstream ss;
344 #if LIBCURL_VERSION_NUM >= 0x071500
346 const char *curl_ip =
nullptr;
347 CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
348 if (rc != CURLE_OK || !curl_ip) {
353 rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
354 if (rc != CURLE_OK || curl_port == 0) {
360 std::stringstream ss;
361 if (strchr(curl_ip,
':') ==
nullptr) {
362 ss <<
"tcp:" << curl_ip <<
":" << curl_port;
364 ss <<
"tcp:[" << curl_ip <<
"]:" << curl_port;
368 std::unique_lock<std::mutex> lock(m_conn_mutex);
369 m_conn_list = ss.str();
378 std::unique_lock<std::mutex> lock(m_conn_mutex);
383 std::unique_lock<std::mutex> lock(m_mutex);
390 std::unique_lock<std::mutex> lock(m_mutex);
391 m_cv.wait_for(lock, dur, [&] {
return m_status >= 0; });
404 std::shared_ptr<TPCQueue> queue;
410 m_mutex.lock_shared();
411 std::lock_guard<std::shared_mutex> guard{m_mutex, std::adopt_lock};
412 auto iter = m_pool_map.find(handler.
GetLabel());
413 if (iter != m_pool_map.end()) {
414 if (!iter->second->IsDone()) {
415 queue = iter->second;
423 auto created_queue =
false;
424 std::string queue_name =
"";
426 std::lock_guard<std::shared_mutex> guard(m_mutex);
427 auto iter = m_pool_map.find(handler.
GetLabel());
428 if (iter == m_pool_map.end()) {
429 queue = std::make_shared<TPCQueue>(handler.
GetLabel(), *
this);
430 m_pool_map.insert(iter, {handler.
GetLabel(), queue});
431 created_queue =
true;
434 queue = iter->second;
438 m_log.
Log(
LogMask::Info,
"RequestManager",
"Created new TPC request queue for", queue_name.c_str());
442 return queue->Produce(handler);
static XrdSysError eDest(0,"crypto_")
std::string GetRemoteConnDesc()
void SetDone(int status, const std::string &msg)
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
std::string GetLabel() const
void UpdateRemoteConnDesc()
int WaitFor(std::chrono::steady_clock::duration)
TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest)
bool Produce(TPCRequest &handler)
void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)