XRootD
XrdHttpTpcPool.hh
Go to the documentation of this file.
1 #ifndef __XRDHTTPTPCPOOL_HH__
2 #define __XRDHTTPTPCPOOL_HH__
3 
4 #include <curl/curl.h>
5 
6 #include <atomic>
7 #include <condition_variable>
8 #include <deque>
9 #include <memory>
10 #include <mutex>
11 #include <shared_mutex>
12 #include <sstream>
13 #include <string>
14 #include <thread>
15 #include <unordered_map>
16 #include <vector>
17 
19 
20 // Forward dec'ls
21 class XrdOucEnv;
22 class XrdSysError;
23 
24 // A pool manager for TPC requests
25 //
26 // The manager maintains a set of worker pools, one for each distinct identifier
27 // (typically, one per organization; this prevents the mixing of transfers from
28 // different organizations on the same TCP socket). Each TPC transfer submitted
29 // must have an identifier; the transfer is then queued for the appropriate pool
30 // and subsequently executed by one of the worker threads.
31 //
32 // Transfers are packed to as few workers as possible in an attempt to reduce
33 // the number of TCP connections; however, if the transfer is not picked up
34 // quickly, a new worker will be spawned. Idle workers will auto-shutdown; if
35 // not used, the pool will have no running threads.
36 namespace TPC {
37 
38 class TPCRequestManager final {
39  public:
40  class TPCRequest {
41  public:
42  TPCRequest(const std::string &label, const int scitag, CURL *handle) : m_label(label), m_scitag(scitag), m_curl(handle) {}
43 
44  int WaitFor(std::chrono::steady_clock::duration);
45  CURL *GetHandle() const;
46  std::string GetLabel() const;
47  int GetScitag() const;
48  std::string GetRemoteConnDesc();
49  void SetActive();
50  void SetDone(int status, const std::string &msg);
51  bool IsActive() const;
52  void Cancel();
53  void UpdateRemoteConnDesc();
54  static std::string GenerateIdentifier(const std::string& label, const char *vorg, const int scitag);
55 
56  private:
57  std::atomic<bool> m_active{false};
58  int m_status{-1};
59  std::string m_conn_list;
60  std::mutex m_conn_mutex;
61  std::atomic<off_t> m_progress_offset{0};
62  // Label assigned to the request. Determines which queue it will be placed into.
63  // A queue with matching identifier is created if it does not already exists.
64  std::string m_label;
65  int m_scitag;
66  CURL *m_curl;
67  std::condition_variable m_cv;
68  std::mutex m_mutex;
69  std::string m_message;
70  };
71 
73 
74  bool Produce(TPCRequest &handler);
75 
76  void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur);
77  void SetMaxWorkers(unsigned max_workers) { m_max_workers = max_workers; }
78  void SetMaxIdleRequests(unsigned max_pending_ops) { m_max_pending_ops = max_pending_ops; }
79 
80  private:
81  class TPCQueue {
82  class TPCWorker;
83 
84  public:
85  TPCQueue(const std::string &identifier, TPCRequestManager &parent) : m_identifier(identifier), m_parent(parent) {}
86 
87  bool Produce(TPCRequest &handler);
88  TPCRequest *TryConsume();
89  TPCRequest *ConsumeUntil(std::chrono::steady_clock::duration dur, TPCWorker *worker);
90  void Done(TPCWorker *);
91  bool IsDone() const { return m_done; }
92 
93  private:
94  class TPCWorker final {
95  public:
96  TPCWorker(const std::string &label, int scitag, TPCQueue &queue);
97  TPCWorker(const TPCWorker &) = delete;
98 
99  void Run();
100  static void RunStatic(TPCWorker *myself);
101 
102  bool IsIdle() const { return m_idle; }
103  void SetIdle(bool idle) { m_idle = idle; }
104  std::condition_variable m_cv;
105 
106  static int closesocket_callback(void *clientp, curl_socket_t fd);
107  static int opensocket_callback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address);
108  static int sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose);
109  std::string getLabel() const { return m_label; }
110 
111  private:
112  bool RunCurl(CURLM *multi_handle, TPCRequest &request);
113 
114  bool m_idle{false};
115  // Label for this worker. Always set to the m_identifier of the queue it serves.
116  const std::string m_label;
117  TPCQueue &m_queue;
118  XrdNetPMark *m_pmark_handle;
119  XrdHttpTpc::PMarkManager m_pmark_manager;
120  };
121 
122  static const long CONNECT_TIMEOUT = 60;
123  bool m_done{false};
124  // Unique identifier for this queue, in the format: "tpc_<vorg>_<scitag>".
125  const std::string m_identifier;
126  std::vector<std::unique_ptr<TPCWorker>> m_workers;
127  std::deque<TPCRequest *> m_ops;
128  std::mutex m_mutex;
129  TPCRequestManager &m_parent;
130  };
131 
132  void Done(const std::string &ident);
133 
134  static std::shared_mutex m_mutex;
135  XrdSysError &m_log; // Log object for the request manager
136  static std::chrono::steady_clock::duration m_idle_timeout;
137  static std::unordered_map<std::string, std::shared_ptr<TPCQueue>> m_pool_map;
138  static unsigned m_max_pending_ops;
139  static unsigned m_max_workers;
140  static std::once_flag m_init_once;
141  XrdOucEnv &m_xrdEnv;
142 };
143 
144 } // namespace TPC
145 
146 #endif // __XRDHTTPTPCPOOL_HH__
static void parent()
static XrdSysError eDest(0,"crypto_")
void CURL
TPCRequest(const std::string &label, const int scitag, CURL *handle)
void SetDone(int status, const std::string &msg)
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
int WaitFor(std::chrono::steady_clock::duration)
TPCRequestManager(XrdOucEnv &xrdEnv, XrdSysError &eDest)
bool Produce(TPCRequest &handler)
void SetWorkerIdleTimeout(std::chrono::steady_clock::duration dur)
void SetMaxWorkers(unsigned max_workers)
void SetMaxIdleRequests(unsigned max_pending_ops)