XRootD
Loading...
Searching...
No Matches
XrdCmsProtocol.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s P r o t o c o l . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <unistd.h>
32#include <cctype>
33#include <cerrno>
34#include <signal.h>
35#include <cstdlib>
36#include <cstring>
37#include <strings.h>
38#include <cstdio>
39#include <netinet/in.h>
40#include <sys/param.h>
41
43
44#include "XrdVersion.hh"
45
46#include "Xrd/XrdInet.hh"
47#include "Xrd/XrdLink.hh"
48
50#include "XrdCms/XrdCmsCache.hh"
53#include "XrdCms/XrdCmsJob.hh"
54#include "XrdCms/XrdCmsLogin.hh"
57#include "XrdCms/XrdCmsMeter.hh"
59#include "XrdCms/XrdCmsRole.hh"
62#include "XrdCms/XrdCmsState.hh"
63#include "XrdCms/XrdCmsTrace.hh"
64
65#include "XrdOuc/XrdOucCRC.hh"
66#include "XrdOuc/XrdOucEnv.hh"
67#include "XrdOuc/XrdOucPup.hh"
69
70#include "XrdSys/XrdSysError.hh"
72#include "XrdSys/XrdSysTimer.hh"
73
74using namespace XrdCms;
75
76/******************************************************************************/
77/* G l o b a l O b j e c t s */
78/******************************************************************************/
79
80 XrdSysMutex XrdCmsProtocol::ProtMutex;
81 XrdCmsProtocol *XrdCmsProtocol::ProtStack = 0;
82
83 int XrdCmsProtocol::readWait = 1000;
84
85 XrdCmsParser XrdCmsProtocol::ProtArgs;
86
87namespace XrdCms
88{
89extern XrdOucEnv theEnv;
90};
91
92/******************************************************************************/
93/* P r o t o c o l L o a d e r */
94/* X r d g e t P r o t o c o l */
95/******************************************************************************/
96
97// This protocol can live in a shared library. It can also be statically linked
98// to provide a default protocol (which, for cms protocol we do). The interface
99// below is used by Xrd to obtain a copy of the protocol object that can be
100// used to decide whether or not a link is talking our particular protocol.
101// Phase 1 initialization occurred on the call to XrdgetProtocolPort(). At this
102// point a network interface is defined and we can complete initialization.
103//
105
106extern "C"
107{
108XrdProtocol *XrdgetProtocol(const char *pname, char *parms,
110{
111// If we failed in Phase 1 initialization, immediately fail Phase 2.
112//
113 if (Config.doWait < 0) return (XrdProtocol *)0;
114
115// Initialize the network interface and get the actual port number assigned
116//
117 Config.PortTCP = pi->NetTCP->Port();
118 Config.NetTCP = pi->NetTCP;
119
120// If we have a connection allow list, add it to the network object. Note that
121// we clear the address because the object is lost in the add process.
122//
123 if (Config.Police) {pi->NetTCP->Secure(Config.Police); Config.Police = 0;}
124
125// Complete initialization and upon success return a protocol object
126//
127 if (Config.Configure2()) return (XrdProtocol *)0;
128
129// Return a new instance of this object
130//
131 return (XrdProtocol *)new XrdCmsProtocol();
132}
133}
134
135/******************************************************************************/
136/* P r o t o c o l P o r t D e t e r m i n a t i o n */
137/* X r d g e t P r o t o c o l P o r t */
138/******************************************************************************/
139
140// Because the dcm port numbers are determined dynamically based on the role the
141// dcm plays, we need to process the configration file and return the right
142// port number if it differs from the one provided by the protocol driver. Only
143// one port instance of the cmsd protocol is allowed.
144//
146
147extern "C"
148{
149int XrdgetProtocolPort(const char *pname, char *parms,
151{
152 static int thePort = -1;
153 char *cfn = pi->ConfigFN, buff[128];
154
155// Check if we have been here before
156//
157 if (thePort >= 0)
158 {if (pi->Port && pi->Port != thePort)
159 {sprintf(buff, "%d disallowed; only using port %d",pi->Port,thePort);
160 Say.Emsg("Config", "Alternate port", buff);
161 }
162 return thePort;
163 }
164
165// Call the level 0 configurator
166//
167 if (Config.Configure0(pi))
168 {Config.doWait = -1; return 0;}
169
170// The only parameter we accept is the name of an alternate config file
171//
172 if (parms)
173 {while(*parms == ' ') parms++;
174 if (*parms)
175 {char *pp = parms;
176 while(*parms != ' ' && *parms) parms++;
177 cfn = pp;
178 }
179 }
180
181// Put up the banner
182//
183 Say.Say("Copr. 2003-2020 Stanford University/SLAC cmsd.");
184
185// Indicate failure if static init fails
186//
187 if (cfn) cfn = strdup(cfn);
188 if (Config.Configure1(pi->argc, pi->argv, cfn))
189 {Config.doWait = -1; return 0;}
190
191// Return the port number to be used
192//
193 thePort = Config.PortTCP;
194 return thePort;
195}
196}
197
198/******************************************************************************/
199/* E x e c u t e */
200/******************************************************************************/
201
203{
204 EPNAME("Execute");
205 static kXR_unt32 theDelay = htonl(Config.SUPDelay);
207 const char *etxt;
208
209// Check if we can continue
210//
211 if (CmsState.Suspended && Arg.Routing & XrdCmsRouting::Delayable)
212 {Reply_Delay(Arg, theDelay); return 0;}
213
214// Validate request code and execute the request. If successful, forward the
215// request to subscribers of this node if the request is forwardable.
216//
217 if (!(Method = Router.getMethod(Arg.Request.rrCode)))
218 Say.Emsg("Protocol", "invalid request code from", myNode->Ident);
219 else if ((etxt = (myNode->*Method)(Arg)))
220 if (*etxt == '!')
221 {DEBUGR(etxt+1 <<" delayed " <<Arg.waitVal <<" seconds");
222 return -EINPROGRESS;
223 } else if (*etxt == '.') return -ECONNABORTED;
224 else Reply_Error(Arg, kYR_EINVAL, etxt);
225 else if (Arg.Routing & XrdCmsRouting::Forward && Cluster.NodeCnt
226 && !(Arg.Request.modifier & kYR_dnf)) Reissue(Arg);
227 return 0;
228}
229
230/******************************************************************************/
231/* M a t c h */
232/******************************************************************************/
233
235{
236CmsRRHdr Hdr;
237int dlen;
238
239// Peek at the first few bytes of data (shouldb be all zeroes)
240//
241 if ((dlen = lp->Peek((char *)&Hdr,sizeof(Hdr),readWait)) != sizeof(Hdr))
242 {if (dlen <= 0) lp->setEtext("login not received");
243 return (XrdProtocol *)0;
244 }
245
246// Verify that this is our protocol and whether a version1 client is here
247//
248 if (Hdr.streamid || Hdr.rrCode != kYR_login)
249 {if (!strncmp((char *)&Hdr, "login ", 6))
250 lp->setEtext("protocol version 1 unsupported");
251 return (XrdProtocol *)0;
252 }
253
254// Return the protocol object
255//
257}
258
259/******************************************************************************/
260/* P a n d e r */
261/******************************************************************************/
262
263// Pander() handles all outgoing connections to a manager/supervisor
264
265void XrdCmsProtocol::Pander(const char *manager, int mport)
266{
267 EPNAME("Pander");
268
269 CmsLoginData Data, loginData;
270 time_t ddmsg = time(0);
271 unsigned int Mode, Role = 0;
272 int myShare = Config.P_gshr << CmsLoginData::kYR_shift;
273 int myTimeZ = Config.TimeZone<< CmsLoginData::kYR_shifttz;
274 int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
275 int rc, fsUtil, KickedOut, blRedir, myNID = Manager->ManTree->Register();
276 int chk4Suspend = XrdCmsState::All_Suspend, TimeOut = Config.AskPing*1000;
277 char manbuff[264];
278 const char *Reason = 0, *manp = manager;
279 const int manblen = sizeof(manbuff);
280 bool terminate;
281
282// Do some debugging
283//
284 DEBUG(myRole <<" services to " <<manager <<':' <<mport);
285
286// Prefill the login data
287//
288 memset(&loginData, 0, sizeof(loginData));
289 loginData.SID = (kXR_char *)Config.mySID;
290 loginData.Paths = (kXR_char *)Config.myPaths;
291 loginData.sPort = Config.PortTCP;
292 loginData.fsNum = Meter.numFS();
293 loginData.tSpace= Meter.TotalSpace(loginData.mSpace);
294
295 loginData.Version = kYR_Version; // These to keep compiler happy
296 loginData.HoldTime= static_cast<int>(getpid());
297 loginData.Mode = 0;
298 loginData.Size = 0;
299 loginData.ifList = (kXR_char *)Config.ifList;
300 loginData.envCGI = (kXR_char *)Config.envCGI;
301
302// Establish request routing based on who we are
303//
304 if (Config.SanList) {Routing= &supVOps;
306 }
307 else if (Config.asManager()) Routing= (Config.asServer() ? &supVOps:&manVOps);
308 else Routing= (Config.asPeer() ? &supVOps:&srvVOps);
309
310// Compute the Manager's status (this never changes for managers/supervisors)
311//
313 else {if (Config.asManager()) Role |= CmsLoginData::kYR_manager;
315 }
317
318// If we are a simple server, permanently add the nostage option if we are
319// not able to stage any files.
320//
321 if (Role == CmsLoginData::kYR_server)
323 else chk4Suspend = XrdCmsState::FES_Suspend;
324
325// Keep connecting to our manager. If suspended, wait for a resumption first
326//
327 do {if (Config.doWait && chk4Suspend)
328 while(CmsState.Suspended & chk4Suspend)
329 {if (!waits--)
330 {Say.Emsg("Pander", "Suspend state still active."); waits=6;}
332 }
333
334 if (!(rc = Manager->ManTree->Trying(myNID, Lvl)) && Lvl)
335 {DEBUG("restarting at root node " <<manager <<':' <<mport);
336 manp = manager; xport = mport; Lvl = 0;
337 } else if (rc < 0) break;
338
339 DEBUG("trying to connect to lvl " <<Lvl <<' ' <<manp <<':' <<xport);
340
341 if (!(Link = Config.NetTCP->Connect(manp, xport, Netopts)))
342 {if (!Netopts && XrdNetAddr::DynDNS() && (time(0) - ddmsg) >= 90)
343 {Say.Emsg("Pander", "Is hostname", manp, "spelled correctly "
344 "or just not running?");
345 ddmsg = time(0);
346 }
347 if (tries--) Netopts = XRDNET_NOEMSG;
348 else {tries = 6; Netopts = 0;}
349 if ((Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
350 {XrdSysTimer::Snooze(3); manp = manbuff;}
351 else {if (manp != manager) fails++;
352 XrdSysTimer::Snooze(6); manp = manager; xport = mport;
353 }
354 continue;
355 }
356 Netopts = 0; tries = waits = 6;
357
358 // Verify that this node has the real DNS name if it's IPv6
359 //
360 if (!(Link->AddrInfo()->isRegistered())
361 && Link->AddrInfo()->isIPType(XrdNetAddrInfo::IPv6))
362 {char *oldName = strdup(Link->Host());
363 Say.Emsg("Protocol", oldName, "is missing an IPv6 ptr record; "
364 "attempting local registration as", manp);
365 if (!(Link->Register(manp)))
366 {Say.Emsg("Protocol", oldName,
367 "registration failed; address mismatch.");
368 } else {
369 Say.Emsg("Protocol", oldName,
370 "is now locally registered as", manp);
371 }
372 free(oldName);
373 }
374
375 // Obtain a new node object for this connection
376 //
377 if (!(myNode = Manager->Add(Link, Lvl+1, terminate)))
378 {Link->Close();
379 if (terminate) break;
380 Say.Emsg("Pander", "Unable to obtain node object.");
381 XrdSysTimer::Snooze(15); continue;
382 }
383
384 // Compute current login mode
385 //
386 Mode = Role
388 | (CmsState.NoStaging ? int(CmsLoginData::kYR_nostage) : 0);
389 if (fails >= 6 && manp == manager)
390 {fails = 0; Mode |= CmsLoginData::kYR_trying;}
391
392 // Login this node with the correct state
393 //
394 loginData.fSpace= Meter.FreeSpace(fsUtil);
395 loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
396 KickedOut = 0;
397 if (!(loginData.dPort = CmsState.Port())) loginData.dPort = 1094;
398 Data = loginData; Data.Mode = Mode | myShare | myTimeZ;
399 if (!(rc = XrdCmsLogin::Login(Link, Data, TimeOut)))
400 {if (!Manager->ManTree->Connect(myNID, myNode)) KickedOut = 1;
401 else {XrdOucEnv cgiEnv((const char *)Data.envCGI);
402 const char *sname = cgiEnv.Get("site");
403 Say.Emsg("Protocol", "Logged into", sname, Link->Name());
404 if (Data.SID)
405 Manager->Verify(Link, (const char *)Data.SID, sname);
406 Reason = Dispatch(isUp, TimeOut, 2);
407 rc = 0;
408 loginData.fSpace= Meter.FreeSpace(fsUtil);
409 loginData.fsUtil= static_cast<kXR_unt16>(fsUtil);
410 }
411 }
412 // Release any storage left over from the login
413 //
414 if (Data.SID) {free(Data.SID); Data.SID = 0;}
415 if (Data.envCGI) {free(Data.envCGI); Data.envCGI = 0;}
416
417 // Remove manager from the config
418 //
419 Manager->Remove(myNode, (rc == kYR_redirect ? "redirected"
420 : (Reason ? Reason : "lost connection")));
421 Manager->ManTree->Disc(myNID);
422 Link->Close();
423
424 // The Sync() will wait until all the threads we started complete. Then
425 // ask the manager to delete the node as it must synchronize with other
426 // threads relative to the manager object before being destroyed.
427 //
428 Sync(); Manager->Delete(myNode); myNode = 0; Reason = 0;
429
430 // Check if we should process the redirection
431 //
432 if (rc == kYR_redirect)
433 {if (!(blRedir = Data.Mode & CmsLoginData::kYR_blredir))
434 Manager->myMans->Add(Link->NetAddr(), (char *)Data.Paths,
435 Config.PortTCP, Lvl+1);
436 else Manager->Rerun((char *)Data.Paths);
437 free(Data.Paths);
438 if (blRedir) break;
439 }
440
441 // Cycle on to the next manager if we have one or snooze and try over
442 //
443 if (!KickedOut && (Lvl = Manager->myMans->Next(xport,manbuff,manblen)))
444 {manp = manbuff; continue;}
445 XrdSysTimer::Snooze((rc < 0 ? 60 : 9)); Lvl = 0;
446 if (manp != manager) fails++;
447 manp = manager; xport = mport;
448 } while(1);
449
450// This must have been a permanent redirect. Tell the manager we are done.
451//
452 Manager->Finished(manager, mport);
453
454// Recycle the protocol object
455//
456 Recycle(0, 0, 0);
457}
458
459/******************************************************************************/
460/* P r o c e s s */
461/******************************************************************************/
462
463// Process is called only when we get a new connection. We only return when
464// the connection drops. At that point we immediately mark he node as offline
465// to prohibit its selection in the future (it may have already been selected).
466// Unfortunately, we need the global selection lock to do that.
467//
469{
470 const char *Reason;
471 Bearing myWay;
472 int tOut;
473
474// Now admit the login
475//
476 Link = lp;
477 if ((Routing=Admit()))
478 {loggedIn = 1;
479 if (RSlot) {myWay = isLateral; tOut = -1;}
480 else {myWay = isDown; tOut = Config.AskPing*1000;}
481 myNode->UnLock();
482 if ((Reason = Dispatch(myWay, tOut, 2))) lp->setEtext(Reason);
483 Cluster.SLock(true); myNode->isOffline = 1; Cluster.SLock(false);
484 }
485
486// Serialize all activity on the link before we proceed. This makes sure that
487// there are no outstanding tasks initiated by this node. We don't need a node
488// lock for this because we are no longer reading requests so no new tasks can
489// be started. Since the node is marked bound, any attempt to reconnect will be
490// rejected until we finish removing this node. We get the node lock afterwards.
491//
492 lp->Serialize();
493 if (!myNode) return -1;
494 Sync();
495 myNode->Lock();
496
497// Immediately terminate redirectors (they have an Rslot). The redirector node
498// can be directly deleted as all references were serialized through the
499// RTable and one we remove our node there can be no references left.
500//
501 if (RSlot)
502 {RTable.Del(myNode); RSlot = 0;
503 myNode->UnLock(); delete myNode; myNode = 0;
504 return -1;
505 }
506
507// We have a node that may or may not be in the cluster at this point, or may
508// need to remain in the cluster as a shadow member. In any case, the node
509// object lock will be released by Remove().
510//
511 if (myNode)
512 {myNode->isConn = 0;
513 if (myNode->isBound) Cluster.Remove(0, myNode, !loggedIn);
514 else if (myNode->isGone) Cluster.Remove(myNode);
515 else myNode->UnLock();
516 }
517
518// All done indicate the connection is dead
519//
520 return -1;
521}
522
523/******************************************************************************/
524/* R e c y c l e */
525/******************************************************************************/
526
527void XrdCmsProtocol::Recycle(XrdLink *lp, int consec, const char *reason)
528{
529 bool isLoggedIn = loggedIn != 0;
530
531 ProtMutex.Lock();
532 ProtLink = ProtStack;
533 ProtStack = this;
534 ProtMutex.UnLock();
535
536 if (!lp) return;
537
538 if (isLoggedIn)
539 if (reason) Say.Emsg("Protocol", lp->ID, "logged out;", reason);
540 else Say.Emsg("Protocol", lp->ID, "logged out.");
541 else
542 if (reason) Say.Emsg("Protocol", lp->ID, "login failed;", reason);
543}
544
545/******************************************************************************/
546/* S t a t s */
547/******************************************************************************/
548
549int XrdCmsProtocol::Stats(char *buff, int blen, int do_sync)
550{
551
552// All the statistics are handled by the cluster
553//
554
555// If we are a manager then we have different information
556//
557 return (Config.asManager() ? Cluster.Statt(buff, blen)
558 : Cluster.Stats(buff, blen));
559}
560
561/******************************************************************************/
562/* P r i v a t e M e t h o d s */
563/******************************************************************************/
564/******************************************************************************/
565/* A d m i t */
566/******************************************************************************/
567XrdCmsRouting *XrdCmsProtocol::Admit()
568{
569 EPNAME("Admit");
570 char *envP = 0, envBuff[256], myBuff[4096];
571 XrdCmsLogin Source(myBuff, sizeof(myBuff));
572 CmsLoginData Data;
574 const char *Reason;
575 SMask_t newmask, servset(0);
576 int addedp = 0, Status = 0, isPeer = 0, isProxy = 0;
577 int isMan, isServ, isSubm, wasSuspended = 0, Share = 100, tZone = 0;
578
579// Construct environment data
580//
581 if (Config.mySite)
582 {snprintf(envBuff, sizeof(envBuff), "site=%s", Config.mySite);
583 envP = envBuff;
584 }
585
586// Establish outgoing mode
587//
588 Data.Mode = 0;
591 wasSuspended = 1;
592 }
593 Data.HoldTime = Config.LUPHold;
594
595// Do the login and get the data
596//
597 if (!Source.Admit(Link, Data, Config.mySID, envP)) return 0;
598
599// Construct environment for incoming node
600//
601 XrdOucEnv cgiEnv((const char *)Data.envCGI);
602 const char *altName = cgiEnv.Get("ovHN");
603 if (altName) {
604 std::string oldName(Link->Host());
605 char buff[512];
606 snprintf(buff, sizeof(buff), "%s -> %s", oldName.c_str(), altName);
607 Say.Emsg("Protocol", "Attempting to use stated mapping", buff);
608 if (!(Link->Register(altName))) {
609 Say.Emsg("Protocol", buff, "stated mapping failed; address mismatch.");
610 } else {
611 Say.Emsg("Protocol", oldName.c_str(), "is now locally registered as", altName);
612 }
613 }
614
615// Handle Redirectors here (minimal stuff to do)
616//
618 {Link->setID("redirector", Data.HoldTime);
619 return Admit_Redirector(wasSuspended);
620 }
621
622// Disallow subscriptions we are are configured as a solo manager
623//
624 if (Config.asSolo())
625 return Login_Failed("configuration disallows subscribers");
626
627// Setup for role tests
628//
629 isMan = Data.Mode & CmsLoginData::kYR_manager;
630 isServ = Data.Mode & CmsLoginData::kYR_server;
631 isSubm = Data.Mode & CmsLoginData::kYR_subman;
632
633// Determine the role of this incoming login.
634//
635 if (isMan)
636 {Status = (isServ ? CMS_isSuper|CMS_isMan : CMS_isMan);
637 if ((isPeer = Data.Mode & CmsLoginData::kYR_peer))
638 {Status |= CMS_isPeer; roleID = XrdCmsRole::PeerManager;}
639 else if (Data.Mode & CmsLoginData::kYR_proxy)
641 else if (Config.asMetaMan() || isSubm)
642 roleID = XrdCmsRole::Manager;
643 else {roleID = XrdCmsRole::Supervisor;
644 Status|= CMS_isSuper;
645 }
646 }
647 else if ((isServ = Data.Mode & CmsLoginData::kYR_server))
648 {if ((isProxy= Data.Mode & CmsLoginData::kYR_proxy))
649 {Status = CMS_isProxy; roleID = XrdCmsRole::ProxyServer;}
650 else roleID = XrdCmsRole::Server;
651 }
652 else if ((isPeer = Data.Mode & CmsLoginData::kYR_peer))
653 {Status |= CMS_isPeer; roleID = XrdCmsRole::Peer;}
654 else return Login_Failed("invalid login role");
655
656// Set the link identification
657//
658 myRole = XrdCmsRole::Name(roleID);
659 Link->setID(myRole, Data.HoldTime);
660
661// Make sure that our role is compatible with the incoming role
662//
663 Reason = 0;
664 if (Config.asProxy()) {if (!isProxy || isPeer)
665 Reason = "configuration only allows proxies";
666 }
667 else if (isProxy) Reason = "configuration disallows proxies";
668 else if (Config.asServer() && isPeer)
669 Reason = "configuration disallows peers";
670 if (Reason) return Login_Failed(Reason);
671
672// The server may specify nostage and suspend
673//
674 if (Data.Mode & CmsLoginData::kYR_nostage) Status |= CMS_noStage;
675 if (Data.Mode & CmsLoginData::kYR_suspend) Status |= CMS_Suspend;
676
677// The server may specify that it has been trying for a long time
678//
680 Say.Emsg("Protocol",Link->Name(),"has not yet found a cluster slot!");
681
682// Add the node. The resulting node object will be locked and the caller will
683// unlock it prior to dispatching.
684//
685 if (!(myNode = Cluster.Add(Link, Data.dPort, Status, Data.sPort,
686 (const char *)Data.SID, (const char *)Data.ifList)))
687 return (XrdCmsRouting *)0;
688 myNode->RoleID = static_cast<char>(roleID);
689 myNode->setVersion(Data.Version);
690
691// Calculate the share as the reference mininum if we are a meta-manager
692//
693 if (Config.asMetaMan())
695 if (Share <= 0 || Share > 100) Share = Config.P_gsdf;
696 if (Share > 0) myNode->setShare(Share);
697 }
698
699// Set the node's timezone
700//
702 tZone = myNode->setTZone(tZone);
703
704// Record the status of the server's filesystem
705//
706 DEBUG(Link->Name() <<" TSpace=" <<Data.tSpace <<"GB NumFS=" <<Data.fsNum
707 <<" FSpace=" <<Data.fSpace <<"MB MinFR=" <<Data.mSpace
708 <<" MB Util=" <<Data.fsUtil <<" Share=" <<Share
709 <<" TZone=" <<tZone);
710 myNode->DiskTotal = Data.tSpace;
711 myNode->DiskMinF = Data.mSpace;
712 myNode->DiskFree = Data.fSpace;
713 myNode->DiskNums = Data.fsNum;
714 myNode->DiskUtil = Data.fsUtil;
716
717// Check for any configuration changes and then process all of the paths.
718//
719 if (Data.Paths && *Data.Paths)
720 {XrdOucTokenizer thePaths((char *)Data.Paths);
721 char *tp, *pp;
722 ConfigCheck(Data.Paths);
723 while((tp = thePaths.GetLine()))
724 {DEBUG(Link->Name() <<" adding path: " <<tp);
725 if (!(tp = thePaths.GetToken())
726 || !(pp = thePaths.GetToken())) break;
727 if (!(newmask = AddPath(myNode, tp, pp)))
728 return Login_Failed("invalid exported path");
729 servset |= newmask;
730 addedp= 1;
731 }
732 }
733
734// Check if we have any special paths. If none, then add the default path.
735//
736 if (!addedp)
737 {XrdCmsPInfo pinfo;
738 ConfigCheck(0);
739 pinfo.rovec = myNode->Mask();
740 if (myNode->isPeer) pinfo.ssvec = myNode->Mask();
741 servset = Cache.Paths.Insert("/", &pinfo);
742 Say.Emsg("Protocol", myNode->Ident, "defaulted r /");
743 }
744
745// Set the reference counts for intersecting nodes to be the same.
746// Additionally, indicate cache refresh will be needed because we have a new
747// node that may have files the we already reported on. Note that setting
748// isBad may be subject to a concurrency race, but that is still OK here.
749//
750 Cluster.ResetRef(servset);
751 if (Config.asManager()) {Manager->Reset(); myNode->SyncSpace();}
752 myNode->isBad &= ~XrdCmsNode::isDisabled;
753
754// At this point we can switch to nonblocking sendq for this node
755//
756 if (Config.nbSQ && (Config.nbSQ > 1 || !myNode->inDomain()))
757 isNBSQ = Link->setNB();
758
759// Document the login
760//
761 const char *sname = cgiEnv.Get("site");
762 const char *lfmt = (myNode->isMan > 1 ? "Standby%s%s" : "Primary%s%s");
763 snprintf(envBuff,sizeof(envBuff),lfmt,(sname ? " ":""),(sname ? sname : ""));
764 Say.Emsg("Protocol", envBuff, myNode->Ident,
765 (myNode->isBad & XrdCmsNode::isSuspend ? "logged in suspended."
766 : "logged in."));
767 if (Data.SID)
768 Say.Emsg("Protocol", myNode->Ident, "system ID:", (const char *)Data.SID);
769 myNode->ShowIF();
770
771// All done
772//
773 return &rspVOps;
774}
775
776/******************************************************************************/
777/* A d m i t _ R e d i r e c t o r */
778/******************************************************************************/
779
780XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(int wasSuspended)
781{
782 EPNAME("Admit_Redirector");
783 static CmsStatusRequest newState
785
786// Indicate what role I have
787//
788 myRole = "redirector";
789
790// Director logins have no additional parameters. We return with the node object
791// locked to be consistent with the way server/suprvisors nodes are returned.
792//
793 myNode = new XrdCmsNode(Link); myNode->Lock();
794 if (!(RSlot = RTable.Add(myNode)))
795 {Say.Emsg("Protocol",myNode->Ident,"login failed; too many redirectors.");
796 myNode->UnLock();
797 delete myNode;
798 myNode = 0;
799 return 0;
800 } else myNode->setSlot(RSlot);
801
802// If we told the redirector we were suspended then we must check if that is no
803// longer true and generate a reume event as the redirector may have missed it
804//
805 if (wasSuspended && !CmsState.Suspended)
806 myNode->Send((char *)&newState, sizeof(newState));
807
808// Login succeeded
809//
810 Say.Emsg("Protocol", myNode->Ident, "logged in.");
811 DEBUG(myNode->Ident <<" assigned slot " <<RSlot);
812 return &rdrVOps;
813}
814
815/******************************************************************************/
816/* A d d P a t h */
817/******************************************************************************/
818
819SMask_t XrdCmsProtocol::AddPath(XrdCmsNode *nP,
820 const char *pType, const char *Path)
821{
822 XrdCmsPInfo pinfo;
823
824// Process: addpath {r | w | rw}[s] path
825//
826 while(*pType)
827 { if ('r' == *pType || (Config.forceRO && 'w' == *pType))
828 pinfo.rovec = nP->Mask();
829 else if ('w' == *pType) pinfo.rovec = pinfo.rwvec = nP->Mask();
830 else if ('s' == *pType) pinfo.rovec = pinfo.ssvec = nP->Mask();
831 else return 0;
832 pType++;
833 }
834
835// Set node options
836//
837 nP->isRW = (pinfo.rwvec ? XrdCmsNode::allowsRW : 0)
838 | (pinfo.ssvec ? XrdCmsNode::allowsSS : 0);
839
840// Add the path to the known path list
841//
842 return Cache.Paths.Insert(Path, &pinfo);
843}
844
845/******************************************************************************/
846/* A l l o c */
847/******************************************************************************/
848
850 const char *theMan,
851 int thePort)
852{
853 XrdCmsProtocol *xp;
854
855// Grab a protocol object and, if none, return a new one
856//
857 ProtMutex.Lock();
858 if ((xp = ProtStack)) ProtStack = xp->ProtLink;
859 else xp = new XrdCmsProtocol();
860 ProtMutex.UnLock();
861
862// Initialize the object if we actually got one
863//
864 if (!xp) Say.Emsg("Protocol","No more protocol objects.");
865 else xp->Init(theRole, uMan, theMan, thePort);
866
867// All done
868//
869 return xp;
870}
871
872/******************************************************************************/
873/* C o n f i g C h e c k */
874/******************************************************************************/
875
876void XrdCmsProtocol::ConfigCheck(unsigned char *theConfig)
877{
878 unsigned int ConfigID;
879 int tmp;
880
881// Compute the new configuration ID
882//
883 if (!theConfig) ConfigID = 1;
884 else ConfigID = XrdOucCRC::CRC32(theConfig, strlen((char *)theConfig));
885
886// If the configuration chaged or a new node, then we must bounce this node
887//
888 if (ConfigID != myNode->ConfigID)
889 {if (myNode->ConfigID) Say.Emsg("Protocol",Link->Name(),"reconfigured.");
890 Cache.Paths.Remove(myNode->Mask());
891 Cache.Bounce(myNode->Mask(), myNode->ID(tmp));
892 myNode->ConfigID = ConfigID;
893 }
894}
895
896/******************************************************************************/
897/* D i s p a t c h */
898/******************************************************************************/
899
900// Dispatch is provided with three key pieces of information:
901// 1) The connection bearing (isUp, isDown, isLateral) the determines how
902// timeouts are to be handled.
903// 2) The maximum amount to wait for data to arrive.
904// 3) The number of successive timeouts we can have before we give up.
905
906const char *XrdCmsProtocol::Dispatch(Bearing cDir, int maxWait, int maxTries)
907{
908 EPNAME("Dispatch");
909 static const int ReqSize = sizeof(CmsRRHdr);
910 XrdCmsRRData *Data = XrdCmsRRData::Objectify();
911 XrdCmsJob *jp;
912 const char *toRC = (cDir == isUp ? "manager not active"
913 : "server not responding");
914 const char *myArgs, *myArgt;
915 char buff[8];
916 int rc, toLeft = maxTries, lastPing = Config.PingTick;
917
918// Dispatch runs with the current thread bound to the link.
919//
920// Link->Bind(XrdSysThread::ID());
921
922// Read in the request header
923//
924do{if ((rc = Link->RecvAll((char *)&Data->Request, ReqSize, maxWait)) < 0)
925 {if (rc != -ETIMEDOUT) return (myNode->isBad & XrdCmsNode::isBlisted ?
926 "blacklisted" : "request read failed");
927 if (!toLeft--) return toRC;
928 if (cDir == isDown)
929 {if (myNode->isBad & XrdCmsNode::isDoomed)
930 return "server blacklisted w/ redirect";
931 if (!SendPing()) return "server unreachable";
932 lastPing = Config.PingTick;
933 }
934 continue;
935 }
936
937// Check if we need to ping as non-response activity may cause ping misses
938//
939 if (cDir == isDown && lastPing != Config.PingTick)
940 {if (myNode->isBad & XrdCmsNode::isDoomed)
941 return "server blacklisted w/ redirect";
942 if (!SendPing()) return "server unreachable";
943 lastPing = Config.PingTick;
944 }
945
946// Decode the length and get the rest of the data
947//
948 toLeft = maxTries;
949 Data->Dlen = static_cast<int>(ntohs(Data->Request.datalen));
950 if ((QTRACE(Debug))
951 && Data->Request.rrCode != kYR_ping && Data->Request.rrCode != kYR_pong)
952 DEBUG(myNode->Ident <<" for " <<Router.getName(Data->Request.rrCode)
953 <<" dlen=" <<Data->Dlen);
954 if (!(Data->Dlen)) {myArgs = myArgt = 0;}
955 else {if (Data->Dlen > maxReqSize)
956 {Say.Emsg("Protocol","Request args too long from",Link->Name());
957 return "protocol error";
958 }
959 if ((!Data->Buff || Data->Blen < Data->Dlen)
960 && !Data->getBuff(Data->Dlen))
961 {Say.Emsg("Protocol", "No buffers to serve", Link->Name());
962 return "insufficient buffers";
963 }
964 if ((rc = Link->RecvAll(Data->Buff, Data->Dlen, maxWait)) < 0)
965 return (rc == -ETIMEDOUT ? "read timed out" : "read failed");
966 myArgs = Data->Buff; myArgt = Data->Buff + Data->Dlen;
967 }
968
969// Check if request is actually valid
970//
971 if (!(Data->Routing = Routing->getRoute(int(Data->Request.rrCode))))
972 {sprintf(buff, "%d", Data->Request.rrCode);
973 Say.Emsg("Protocol",Link->Name(),"sent an invalid request -", buff);
974 continue;
975 }
976
977// Parse the arguments (we do this in the main thread to avoid overruns)
978//
979 if (!(Data->Routing & XrdCmsRouting::noArgs))
980 {if (Data->Request.modifier & kYR_raw)
981 {Data->Path = Data->Buff; Data->PathLen = Data->Dlen;}
982 else if (!myArgs
983 || !ProtArgs.Parse(int(Data->Request.rrCode),myArgs,myArgt,Data))
984 {Reply_Error(*Data, kYR_EINVAL, "badly formed request");
985 continue;
986 }
987 }
988
989// Insert correct identification
990//
991 if (!(Data->Ident) || !(*Data->Ident)) Data->Ident = myNode->Ident;
992
993// Schedule this request if async. Otherwise, do this inline. Note that
994// synchrnous requests are allowed to return status changes (e.g., redirect)
995//
996 if (Data->Routing & XrdCmsRouting::isSync)
997 {if ((rc = Execute(*Data)) && rc == -ECONNABORTED)
998 return "disconnected";
999 Data->Ident = 0; // Execute() undefines Ident so zero for reuse
1000 }
1001 else if ((jp = XrdCmsJob::Alloc(this, Data)))
1002 {Ref(1);
1003 Sched->Schedule((XrdJob *)jp);
1004 Data = XrdCmsRRData::Objectify();
1005 }
1006 else Say.Emsg("Protocol", "No jobs to serve", Link->Name());
1007 } while(1);
1008
1009// We should never get here
1010//
1011 return "logic error";
1012}
1013
1014/******************************************************************************/
1015/* D o I t */
1016/******************************************************************************/
1017
1018// Determine how we should proceed here
1019//
1021{
1022
1023// If we have a role, then we should simply pander it
1024//
1025 if (myRole) Pander(myMan, myManPort);
1026}
1027
1028/******************************************************************************/
1029/* Private: I n i t */
1030/******************************************************************************/
1031
1032void XrdCmsProtocol::Init(const char *iRole, XrdCmsManager *uMan,
1033 const char *iMan, int iPort)
1034{
1035 myRole = iRole;
1036 myMan = iMan;
1037 myManPort = iPort;
1038 Manager = uMan;
1039 myNode = 0;
1040 loggedIn = 0;
1041 RSlot = 0;
1042 ProtLink = 0;
1043 refCount = 0;
1044 refWait = 0;
1045 isNBSQ = false;
1046}
1047
1048/******************************************************************************/
1049/* L o g i n _ F a i l e d */
1050/******************************************************************************/
1051
1052XrdCmsRouting *XrdCmsProtocol::Login_Failed(const char *reason)
1053{
1054 Link->setEtext(reason);
1055 return (XrdCmsRouting *)0;
1056}
1057
1058/******************************************************************************/
1059/* Private: R e f */
1060/******************************************************************************/
1061
1063{
1064// Update the reference counter
1065//
1066 refMutex.Lock();
1067 refCount += rcnt;
1068
1069// Check if someone is waiting for the count to drop to zero
1070//
1071 if (refWait && refCount <= 0) {refWait->Post(); refWait = 0;}
1072
1073// All done
1074//
1075 refMutex.UnLock();
1076}
1077
1078/******************************************************************************/
1079/* R e i s s u e */
1080/******************************************************************************/
1081
1082void XrdCmsProtocol::Reissue(XrdCmsRRData &Data)
1083{
1084 EPNAME("Resisue");
1085 XrdCmsPInfo pinfo;
1086 SMask_t amask;
1087 struct iovec ioB[2] = {{(char *)&Data.Request, sizeof(Data.Request)},
1088 { Data.Buff, (size_t)Data.Dlen}
1089 };
1090
1091// Check if we can really reissue the command
1092//
1093 if (!((Data.Request.modifier += kYR_hopincr) & kYR_hopcount))
1094 {Say.Emsg("Job", Router.getName(Data.Request.rrCode),
1095 "msg TTL exceeded for", Data.Path);
1096 return;
1097 }
1098
1099// We do not support 2way re-issued messages
1100//
1101 Data.Request.streamid = 0;
1102
1103// Find all the nodes that might be able to do somthing on this path
1104//
1105 if (!Cache.Paths.Find(Data.Path, pinfo)
1106 || (amask = pinfo.rwvec | pinfo.rovec) == 0)
1107 {Say.Emsg(epname, Router.getName(Data.Request.rrCode),
1108 "aborted; no servers handling", Data.Path);
1109 return;
1110 }
1111
1112// While destructive operations should only go to r/w servers
1113//
1114 if (Data.Request.rrCode != kYR_prepdel)
1115 {if (!(amask = pinfo.rwvec))
1116 {Say.Emsg(epname, Router.getName(Data.Request.rrCode),
1117 "aborted; no r/w servers handling", Data.Path);
1118 return;
1119 }
1120 }
1121
1122// Do some debugging
1123//
1124 DEBUG("FWD " <<Router.getName(Data.Request.rrCode) <<' ' <<Data.Path);
1125
1126// Check for selective sending since DFS setups need only one notification.
1127// Otherwise, send this message to all nodes.
1128//
1129 if (baseFS.isDFS() && Data.Request.rrCode != kYR_prepdel)
1130 {Cluster.Broadsend(amask, Data.Request, Data.Buff, Data.Dlen);
1131 } else {
1132 Cluster.Broadcast(amask, ioB, 2, sizeof(Data.Request)+Data.Dlen);
1133 }
1134}
1135
1136/******************************************************************************/
1137/* R e p l y _ D e l a y */
1138/******************************************************************************/
1139
1140void XrdCmsProtocol::Reply_Delay(XrdCmsRRData &Data, kXR_unt32 theDelay)
1141{
1142 EPNAME("Reply_Delay");
1143 const char *act;
1144
1145 if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
1146 {CmsResponse Resp = {{Data.Request.streamid, kYR_wait, 0,
1147 htons(sizeof(kXR_unt32))}, theDelay};
1148 act = " sent";
1149 Link->Send((char *)&Resp, sizeof(Resp));
1150 } else act = " skip";
1151
1152 DEBUG(myNode->Ident <<act <<" delay " <<ntohl(theDelay));
1153}
1154
1155/******************************************************************************/
1156/* R e p l y _ E r r o r */
1157/******************************************************************************/
1158
1159void XrdCmsProtocol::Reply_Error(XrdCmsRRData &Data, int ecode, const char *etext)
1160{
1161 EPNAME("Reply_Error");
1162 const char *act;
1163 int n = strlen(etext)+1;
1164
1165 if (Data.Request.streamid && (Data.Routing & XrdCmsRouting::Repliable))
1166 {CmsResponse Resp = {{Data.Request.streamid, kYR_error, 0,
1167 htons((unsigned short int)(sizeof(kXR_unt32)+n))},
1168 htonl(static_cast<unsigned int>(ecode))};
1169 struct iovec ioV[2] = {{(char *)&Resp, sizeof(Resp)},
1170 {(char *)etext, (size_t)n}};
1171 act = " sent";
1172 Link->Send(ioV, 2);
1173 } else act = " skip";
1174
1175 DEBUG(myNode->Ident <<act <<" err " <<ecode <<' ' <<etext);
1176}
1177
1178/******************************************************************************/
1179/* Private: S e n d P i n g */
1180/******************************************************************************/
1181
1182bool XrdCmsProtocol::SendPing()
1183{
1184 static CmsPingRequest Ping = {{0, kYR_ping, 0, 0}};
1185
1186// We would like not send ping requests to servers that are backlogged but if
1187// we don't (currently) it will look to one of the parties that the other party
1188// if not functional. We should try to fix this, sigh.
1189//
1190// if (isNBSQ && Link->Backlog()) return true;
1191
1192// Send the ping
1193//
1194 if (Link->Send((char *)&Ping, sizeof(Ping)) < 0) return false;
1195 return true;
1196}
1197
1198/******************************************************************************/
1199/* Private: S y n c */
1200/******************************************************************************/
1201
1202void XrdCmsProtocol::Sync()
1203{
1204 EPNAME("Sync");
1205 XrdSysSemaphore mySem(0);
1206
1207// Make sure that all threads that we started have completed
1208//
1209 refMutex.Lock();
1210 if (refCount <= 0) refMutex.UnLock();
1211 else {refWait = &mySem;
1212 DEBUG("Waiting for " <<refCount <<' ' <<myNode->Ident
1213 <<" thread(s) to end.");
1214 refMutex.UnLock();
1215 mySem.Wait();
1216 }
1217}
unsigned int kXR_unt32
Definition XPtypes.hh:90
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
#define DEBUG(x)
#define EPNAME(x)
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
XrdVERSIONINFO(XrdgetProtocol, cmsd)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
#define DEBUGR(y)
#define TRACE_Debug
#define QTRACE(act)
unsigned long long SMask_t
#define XRDNET_NOEMSG
Definition XrdNetOpts.hh:71
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
int Mode
bool Debug
XrdOucString Path
if(ec< 0) ec
XrdCmsPList_Anchor Paths
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
XrdInet * NetTCP
const char * mySID
static XrdCmsJob * Alloc(XrdCmsProtocol *, XrdCmsRRData *)
Definition XrdCmsJob.cc:64
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
XrdCmsManTree * ManTree
int FreeSpace(int &tutil)
void setVirtUpdt()
static const char allowsRW
Definition XrdCmsNode.hh:84
unsigned int ConfigID
Definition XrdCmsNode.hh:92
static const char allowsSS
Definition XrdCmsNode.hh:85
SMask_t Mask()
static const char isSuspend
Definition XrdCmsNode.hh:81
int ID(int &INum)
static const char isDoomed
Definition XrdCmsNode.hh:82
static const char isBlisted
Definition XrdCmsNode.hh:79
SMask_t ssvec
SMask_t rovec
SMask_t rwvec
int Find(const char *pname, XrdCmsPInfo &masks)
SMask_t Insert(const char *pname, XrdCmsPInfo *pinfo)
friend class XrdCmsJob
void Recycle(XrdLink *lp, int consec, const char *reason)
int Process(XrdLink *lp)
void Ref(int rcnt)
int Execute(XrdCmsRRData &Data)
int Stats(char *buff, int blen, int do_sync=0)
static XrdCmsProtocol * Alloc(const char *theRole="", XrdCmsManager *mP=0, const char *theMan=0, int thePort=0)
XrdProtocol * Match(XrdLink *lp)
static XrdCmsRRData * Objectify(XrdCmsRRData *op=0)
int getBuff(size_t bsz)
XrdCms::CmsRRHdr Request
short Add(XrdCmsNode *nP)
static const char * Name(RoleID rid)
Definition XrdCmsRole.hh:63
const char * getName(int Code)
const char *(XrdCmsNode::* NodeMethod_t)(XrdCmsRRData &)
static const char FES_Suspend
static const char All_Suspend
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
Definition XrdInet.cc:185
void Secure(XrdNetSecurity *secp)
Definition XrdInet.cc:244
XrdJob(const char *desc="")
Definition XrdJob.hh:51
static bool DynDNS()
Definition XrdNetAddr.hh:52
int Port()
Definition XrdNet.hh:191
static uint32_t CRC32(const unsigned char *data, int count)
Definition XrdOucCRC.cc:171
XrdProtocol(const char *jname)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
PingImpl< false > Ping
XrdCmsRouting manVOps
XrdCmsMeter Meter
XrdCmsCache Cache
XrdCmsRouter Router
XrdCmsRouting supVOps
static const unsigned char kYR_Version
Definition YProtocol.hh:80
kXR_unt16 datalen
Definition YProtocol.hh:86
@ kYR_EINVAL
Definition YProtocol.hh:153
static const int CMS_isSuper
static const int CMS_noStage
kXR_char modifier
Definition YProtocol.hh:85
XrdScheduler * Sched
XrdCmsRouting rspVOps
XrdCmsCluster Cluster
XrdCmsBaseFS baseFS
@ kYR_redirect
Definition YProtocol.hh:143
@ kYR_error
Definition YProtocol.hh:142
XrdSysError Say
XrdCmsRTable RTable
XrdCmsRouting srvVOps
kXR_char rrCode
Definition YProtocol.hh:84
XrdSysTrace Trace("cms")
XrdCmsState CmsState
XrdCmsRouting rdrVOps
kXR_unt32 streamid
Definition YProtocol.hh:83
static const int CMS_isMan
XrdCmsConfig Config
@ kYR_prepdel
Definition YProtocol.hh:97
@ kYR_login
Definition YProtocol.hh:90
@ kYR_status
Definition YProtocol.hh:112
static const int CMS_isPeer
@ kYR_hopincr
Definition YProtocol.hh:128
@ kYR_hopcount
Definition YProtocol.hh:127
static const int CMS_isProxy
XrdOucEnv theEnv
static const int CMS_Suspend