XRootD
Loading...
Searching...
No Matches
XrdHttpTpcPool.hh
Go to the documentation of this file.
1#ifndef __XRDHTTPTPCPOOL_HH__
2#define __XRDHTTPTPCPOOL_HH__
3
4#include <curl/curl.h>
5
6#include <atomic>
7#include <condition_variable>
8#include <deque>
9#include <memory>
10#include <mutex>
11#include <shared_mutex>
12#include <sstream>
13#include <string>
14#include <thread>
15#include <unordered_map>
16#include <vector>
17
19
20// Forward dec'ls
21class XrdOucEnv;
22class XrdSysError;
23
24// A pool manager for TPC requests
25//
26// The manager maintains a set of worker pools, one for each distinct identifier
27// (typically, one per organization; this prevents the mixing of transfers from
28// different organizations on the same TCP socket). Each TPC transfer submitted
29// must have an identifier; the transfer is then queued for the appropriate pool
30// and subsequently executed by one of the worker threads.
31//
32// Transfers are packed to as few workers as possible in an attempt to reduce
33// the number of TCP connections; however, if the transfer is not picked up
34// quickly, a new worker will be spawned. Idle workers will auto-shutdown; if
35// not used, the pool will have no running threads.
36namespace TPC {
37
38class TPCRequestManager final {
39 public:
40 class TPCRequest {
41 public:
42 TPCRequest(const std::string &label, const int scitag, CURL *handle) : m_label(label), m_scitag(scitag), m_curl(handle) {}
43
44 int WaitFor(std::chrono::steady_clock::duration);
45 CURL *GetHandle() const;
46 std::string GetLabel() const;
47 int GetScitag() const;
48 std::string GetRemoteConnDesc();
49 void SetActive();
50 void SetDone(int status, const std::string &msg);
51 bool IsActive() const;
52 void Cancel();
54 static std::string GenerateIdentifier(const std::string& label, const char *vorg, const int scitag);
55
56 private:
57 std::atomic<bool> m_active{false};
58 int m_status{-1};
59 std::string m_conn_list;
60 std::mutex m_conn_mutex;
61 std::atomic<off_t> m_progress_offset{0};
62 // Label assigned to the request. Determines which queue it will be placed into.
63 // A queue with matching identifier is created if it does not already exists.
64 std::string m_label;
65 int m_scitag;
66 CURL *m_curl;
67 std::condition_variable m_cv;
68 std::mutex m_mutex;
69 std::string m_message;
70 };
71
72 TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest);
73
74 bool Produce(TPCRequest &handler);
75
76 void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur);
77 void SetMaxWorkers(unsigned max_workers) { m_max_workers = max_workers; }
78 void SetMaxIdleRequests(unsigned max_pending_ops) { m_max_pending_ops = max_pending_ops; }
79
80 private:
81 class TPCQueue {
82 class TPCWorker;
83
84 public:
85 TPCQueue(const std::string &identifier, TPCRequestManager &parent) : m_identifier(identifier), m_parent(parent) {}
86
87 bool Produce(TPCRequest &handler);
88 TPCRequest *TryConsume();
89 TPCRequest *ConsumeUntil(std::chrono::steady_clock::duration dur, TPCWorker *worker);
90 void Done(TPCWorker *);
91 bool IsDone() const { return m_done; }
92
93 private:
94 class TPCWorker final {
95 public:
96 TPCWorker(const std::string &label, int scitag, TPCQueue &queue);
97 TPCWorker(const TPCWorker &) = delete;
98
99 void Run();
100 static void RunStatic(TPCWorker *myself);
101
102 bool IsIdle() const { return m_idle; }
103 void SetIdle(bool idle) { m_idle = idle; }
104 std::condition_variable m_cv;
105
106 static int closesocket_callback(void *clientp, curl_socket_t fd);
107 static int opensocket_callback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address);
108 static int sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose);
109 std::string getLabel() const { return m_label; }
110
111 private:
112 bool RunCurl(CURLM *multi_handle, TPCRequest &request);
113
114 bool m_idle{false};
115 // Label for this worker. Always set to the m_identifier of the queue it serves.
116 const std::string m_label;
117 TPCQueue &m_queue;
118 XrdNetPMark *m_pmark_handle;
119 XrdHttpTpc::PMarkManager m_pmark_manager;
120 };
121
122 static const long CONNECT_TIMEOUT = 60;
123 bool m_done{false};
124 // Unique identifier for this queue, in the format: "tpc_<vorg>_<scitag>".
125 const std::string m_identifier;
126 std::vector<std::unique_ptr<TPCWorker>> m_workers;
127 std::deque<TPCRequest *> m_ops;
128 std::mutex m_mutex;
129 TPCRequestManager &m_parent;
130 };
131
132 void Done(const std::string &ident);
133
134 static std::shared_mutex m_mutex;
135 XrdSysError &m_log; // Log object for the request manager
136 static std::chrono::steady_clock::duration m_idle_timeout;
137 static std::unordered_map<std::string, std::shared_ptr<TPCQueue>> m_pool_map;
138 static unsigned m_max_pending_ops;
139 static unsigned m_max_workers;
140 static std::once_flag m_init_once;
141 XrdOucEnv &m_xrdEnv;
142};
143
144} // namespace TPC
145
146#endif // __XRDHTTPTPCPOOL_HH__
static void parent()
static XrdSysError eDest(0,"crypto_")
void CURL
TPCRequest(const std::string &label, const int scitag, CURL *handle)
void SetDone(int status, const std::string &msg)
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
int WaitFor(std::chrono::steady_clock::duration)
TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest)
bool Produce(TPCRequest &handler)
void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur)
void SetMaxWorkers(unsigned max_workers)
void SetMaxIdleRequests(unsigned max_pending_ops)