XRootD
Loading...
Searching...
No Matches
XrdCmsClientMan.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s C l i e n t M a n . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <ctime>
32
35#include "XrdCms/XrdCmsLogin.hh"
36#include "XrdCms/XrdCmsTrace.hh"
37
39
40#include "XrdSys/XrdSysError.hh"
41#include "XrdSys/XrdSysTimer.hh"
42
43#include "Xrd/XrdInet.hh"
44#include "Xrd/XrdLink.hh"
45
46using namespace XrdCms;
47
48/******************************************************************************/
49/* G l o b a l s */
50/******************************************************************************/
51
52XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16);
53
54XrdInet *XrdCmsClientMan::Network = 0;
55
57
58const char *XrdCmsClientMan::ConfigFN = 0;
59
60XrdSysMutex XrdCmsClientMan::manMutex;
61
62/******************************************************************************/
63/* C o n s t r u c t o r */
64/******************************************************************************/
65
67 int cw, int nr, int rw, int rd)
68 : syncResp(0)
69{
70 static XrdSysMutex initMutex;
71 static int Instance = 0;
72 char *dot;
73
74 Host = strdup(host);
75 if ((dot = index(Host, '.')))
76 {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
77 else HPfx = strdup(Host);
78 Port = port;
79 Link = 0;
80 Active = 0;
81 Silent = 0;
82 Suspend = 1;
83 RecvCnt = 0;
84 nrMax = nr;
85 NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
86 repWMax = rw;
87 repWait = 0;
88 minDelay= rd;
89 maxDelay= rd*3;
90 chkCount= chkVal;
91 lastUpdt= lastTOut = time(0);
92 Next = 0;
93 manInst = 1;
94
95// Compute dally value
96//
97 dally = cw / 2 - 1;
98 if (dally < 3) dally = 3;
99 else if (dally > 10) dally = 10;
100
101// Provide a unique mask number for this manager
102//
103 initMutex.Lock();
104 manMask = 1<<Instance++;
105 initMutex.UnLock();
106}
107
108/******************************************************************************/
109/* D e s t r u c t o r */
110/******************************************************************************/
111
113{
114 if (Link) Link->Close();
115 if (Host) free(Host);
116 if (HPfx) free(HPfx);
117 if (NetBuff) NetBuff->Recycle();
118}
119
120/******************************************************************************/
121/* d e l a y R e s p */
122/******************************************************************************/
123
125{
126 XrdCmsResp *rp;
127 int msgid;
128
129// Obtain the message ID
130//
131 if (!(msgid = Resp.getErrInfo()))
132 {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
133 Resp.setErrInfo(EILSEQ, "redirector protocol error");
134 syncResp.Post();
135 return SFS_ERROR;
136 }
137
138// Allocate a delayed response object
139//
140 if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
141 {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
142 Resp.setErrInfo(0, "0");
143 syncResp.Post();
144 return SFS_STALL;
145 }
146
147// Add this object to our delayed response queue. If the manager bounced then
148// purge all of the pending repsonses to avoid sending wrong ones.
149//
150 if (msgid < maxMsgID) RespQ.Purge();
151 maxMsgID = msgid;
152 RespQ.Add(rp);
153
154// Tell client to wait for response. The semaphore post allows the manager
155// to get the next message from the cmsd. This prevents us from getting the
156// delayed response before the response object is added to the queue.
157//
158 Resp.setErrInfo(0, "");
159 syncResp.Post();
160 return SFS_STARTED;
161}
162
163/******************************************************************************/
164/* S e n d */
165/******************************************************************************/
166
167int XrdCmsClientMan::Send(unsigned int &iMan, char *msg, int mlen)
168{
169 int allok = 0;
170
171// Determine message length
172//
173 if (!mlen) mlen = strlen(msg);
174
175// Send the request
176//
177 myData.Lock();
178 iMan = manInst;
179 if (Active)
180 {if (Link)
181 {if (!(allok = Link->Send(msg, mlen) > 0))
182 {Active = 0;
183 Link->Close(1);
184 manInst++;
185 } else SendCnt++;
186 }
187 }
188 myData.UnLock();
189
190// All done
191//
192 return allok;
193}
194
195/******************************************************************************/
196
197int XrdCmsClientMan::Send(unsigned int &iMan, const struct iovec *iov, int iovcnt, int iotot)
198{
199 int allok = 0;
200
201// Send the request
202//
203 myData.Lock();
204 iMan = manInst;
205 if (Active)
206 {if (Link)
207 {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
208 {Active = 0;
209 Link->Close(1);
210 manInst++;
211 } else SendCnt++;
212 }
213 }
214 myData.UnLock();
215
216// All done
217//
218 return allok;
219}
220
221/******************************************************************************/
222/* S t a r t */
223/******************************************************************************/
224
226{
227
228// First step is to connect to the manager
229//
230 do {Hookup();
231 // Now simply start receiving messages on the stream. When we get a
232 // respwait reply then we must be assured that the object representing
233 // the request is added to the queue before the actual reply arrives.
234 // We do this by waiting on syncResp which is posted once the request
235 // object is fully processed. The actual response associated with the
236 // respwait is synchronized during the callback phase since the client
237 // must receive the respwait before the subsequent response.
238 //
239 while(Receive())
240 if (Response.modifier & CmsResponse::kYR_async) relayResp();
241 else if (Response.rrCode == kYR_status) setStatus();
242 else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
243 {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
244
245 // Tear down the connection
246 //
247 myData.Lock();
248 if (Link) {Link->Close(); Link = 0;}
249 Active = 0; Suspend = 1;
250 myData.UnLock();
251
252 // Indicate the problem
253 //
254 Say.Emsg("ClientMan", "Disconnected from", Host);
255 XrdSysTimer::Snooze(dally);
256 } while(1);
257
258// We should never get here
259//
260 return (void *)0;
261}
262
263/******************************************************************************/
264/* w h a t s U p */
265/******************************************************************************/
266
267int XrdCmsClientMan::whatsUp(const char *user, const char *path,
268 unsigned int iMan)
269{
270 EPNAME("whatsUp");
271 unsigned int xMan;
272 int theDelay, inQ;
273 bool lClose = false;
274
275// The cmsd did not respond. Increase silent count and see if restart is needed
276// Otherwise, increase the wait interval just in case things are just slow.
277//
278 myData.Lock();
279 if (Active)
280 {if (Active == RecvCnt)
281 {if ((time(0)-lastTOut) >= repWait)
282 {Silent++;
283 if (Silent > nrMax)
284 {Active = 0; Silent = 0; Suspend = 1;
285 if (Link && iMan == manInst)
286 {Link->Close(1);
287 manInst++; lClose = true;
288 }
289 } else if (Silent & 0x02 && repWait < repWMax) repWait++;
290 }
291 } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
292 }
293
294// Calclulate how long to delay the client. This will be based on the number
295// of outstanding requests bounded by the config delay value.
296//
297 inQ = XrdCmsClientMsg::inQ();
298 theDelay = inQ * qTime;
299 xMan = manInst;
300 myData.UnLock();
301 theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302 if (theDelay < minDelay) theDelay = minDelay;
303 if (theDelay > maxDelay) theDelay = maxDelay;
304
305// Do Some tracing here
306//
307 TRACE(Redirect, user <<" no resp from inst " <<iMan <<" of "<<HPfx
308 <<" in " <<repWait
309 <<" inst " <<xMan <<(lClose ? " closed" : " steady")
310 <<"; inQ " <<inQ <<" delay " <<theDelay <<" path=" <<path);
311 return theDelay;
312}
313
314/******************************************************************************/
315/* P r i v a t e M e t h o d s */
316/******************************************************************************/
317/******************************************************************************/
318/* H o o k u p */
319/******************************************************************************/
320
321int XrdCmsClientMan::Hookup()
322{
323 EPNAME("Hookup");
324 CmsLoginData Data;
325 XrdLink *lp;
326 char buff[256], hnBuff[264*2+1];
327 kXR_char *envData = 0;
328 int rc, oldWait, tries = 12, opts = 0;
329
330// Turn off our debugging and version flags
331//
332 manMutex.Lock();
333 doDebug &= ~manMask;
334 manMutex.UnLock();
335
336// Report our hostname (there are better ways of doing this)
337//
338 const char *hn = getenv("XRDHOST");
339 const char *override_hn = getenv("OVERRIDEXRDHOST");
340 if (hn && override_hn)
341 {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s&ovHN=%s", hn, override_hn);
342 envData = (kXR_char *)hnBuff;
343 }
344 else if (hn)
345 {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s", hn);
346 envData = (kXR_char *)hnBuff;
347 }
348
349// Keep trying to connect to the manager. Note that we bind the link to this
350// thread to make sure we get notified should another thread close the socket.
351//
352 do {while(!(lp = Network->Connect(Host, Port, opts)))
353 {XrdSysTimer::Snooze(dally);
354 if (tries--) opts = XRDNET_NOEMSG;
355 else {opts = 0; tries = 12;}
356 continue;
357 }
358// lp->Bind(XrdSysThread::ID());
359 memset(&Data, 0, sizeof(Data));
360 Data.envCGI = envData;
362 Data.HoldTime = static_cast<int>(getpid());
363 if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
364 lp->Close();
365 XrdSysTimer::Snooze(dally);
366 } while(1);
367
368// Establish global state
369//
370 manMutex.Lock();
371 doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
372 manMutex.UnLock();
373
374// All went well, finally
375//
376 myData.Lock();
377 Link = lp;
378 Active = 1;
379 Silent = 0;
380 RecvCnt = 1;
381 SendCnt = 1;
382 Suspend = (Data.Mode & CmsLoginData::kYR_suspend);
383
384// Calculate how long we will wait for replies before delaying the client.
385// This is computed dynamically based on the expected response window.
386//
387 if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
388 if (Data.HoldTime > repWMax*1000) repWait = repWMax;
389 else if (Data.HoldTime <= 0) repWait = repWMax;
390 else {repWait = Data.HoldTime*3;
391 repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
392 if (repWait > repWMax) repWait = repWMax;
393 else if (repWait < oldWait) repWait = oldWait;
394 }
395 qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
396 lastTOut = time(0);
397 myData.UnLock();
398
399// Tell the world
400//
401 sprintf(buff, "v %d", Data.Version);
402 Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
403 Host, buff);
404 DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
405 return 1;
406}
407
408/******************************************************************************/
409/* R e c e i v e */
410/******************************************************************************/
411
412int XrdCmsClientMan::Receive()
413{
414// This method is always run out of the object's main thread. Other threads
415// may call methods that initiate a link reset via a deferred close. We will
416// notice that here because the file descriptor will be closed. This will
417// cause us to return an error and precipitate a connection teardown.
418//
419 EPNAME("Receive")
420 if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
421 {int dlen = static_cast<int>(ntohs(Response.datalen));
422 RecvCnt++;
423 DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
424 if (!dlen) return 1;
425 if ((dlen > NetBuff->BuffSize())
426 && (Response.rrCode != kYR_data || !NetBuff->Resize(dlen)))
427 Say.Emsg("ClientMan", "Excessive msg length from", Host);
428 else {NetBuff->SetLen(dlen);
429 return Link->RecvAll(NetBuff->Buffer(), dlen);
430 }
431 }
432 return 0;
433}
434
435/******************************************************************************/
436/* r e l a y R e s p */
437/******************************************************************************/
438
439void XrdCmsClientMan::relayResp()
440{
441 EPNAME("relayResp");
442 XrdCmsResp *rp;
443
444// Remove the response object from our queue.
445//
446 if (!(rp = RespQ.Rem(Response.streamid)))
447 {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
448 return;
449 }
450
451// Queue the request for reply (this transfers the network buffer)
452//
453 rp->Reply(HPfx, Response, NetBuff);
454
455// Obtain a new network buffer
456//
457 NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
458}
459
460/******************************************************************************/
461/* Private: c h k S t a t u s */
462/******************************************************************************/
463
464int XrdCmsClientMan::chkStatus()
465{
466 static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
467 XrdSysMutexHelper mdMon(myData);
468 time_t nowTime;
469
470// Count down the query count and ask again every 30 seconds
471//
472 if (!chkCount--)
473 {chkCount = chkVal;
474 nowTime = time(0);
475 if ((nowTime - lastUpdt) >= 30)
476 {lastUpdt = nowTime;
477 if (Active) Link->Send((char *)&Updt, sizeof(Updt));
478 }
479 }
480 return Suspend;
481}
482
483/******************************************************************************/
484/* s e t S t a t u s */
485/******************************************************************************/
486
487void XrdCmsClientMan::setStatus()
488{
489 EPNAME("setStatus");
490 const char *State = 0, *Event = "?";
491
492
493 myData.Lock();
494 if (Response.modifier & CmsStatusRequest::kYR_Suspend)
495 {Event = "suspend";
496 if (!Suspend) {Suspend = 1; State = "suspended";}
497 }
498 else if (Response.modifier & CmsStatusRequest::kYR_Resume)
499 {Event = "resume";
500 if (Suspend) {Suspend = 0; State = "resumed";}
501 }
502 myData.UnLock();
503
504 DEBUG(Host <<" sent " <<Event <<" event");
505 if (State) Say.Emsg("setStatus", "Manager", Host, State);
506}
unsigned char kXR_char
Definition XPtypes.hh:65
#define DEBUG(x)
#define EPNAME(x)
#define Updt(x)
#define XRDNET_NOEMSG
Definition XrdNetOpts.hh:71
struct myOpts opts
#define SFS_ERROR
#define SFS_STALL
#define SFS_STARTED
if(ec< 0) ec
#define TRACE(act, x)
Definition XrdTrace.hh:63
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
static char doDebug
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition XrdCmsResp.cc:64
const char * getErrUser()
int setErrInfo(int code, const char *emsg)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
@ kYR_waitresp
Definition YProtocol.hh:145
XrdSysError Say
@ kYR_update
Definition YProtocol.hh:115
@ kYR_status
Definition YProtocol.hh:112
static const size_t Max_Error_Len