34#include <netinet/in.h>
81int XrdCmsNode::LastFree = 0;
88const char *msrcmsg =
"Cluster does not support multi-source access.";
89int msrclen = strlen(msrcmsg)+1;
90const char *mtrymsg =
"Cluster retry limit exceeded.";
91int mtrylen = strlen(mtrymsg)+1;
99 int port,
int lvl,
int id)
102 static const SMask_t smask_1(1);
106 NodeMask = (
id < 0 ? 0 : smask_1 << id);
110 myNID = strdup(nid ? nid :
"?");
111 if ((myCID = index(myNID,
' '))) myCID++;
118 setName(lnkp, theIF, (nid ? port : 0));
135 if (cidP) {cidP->RemNode(
this); cidP = 0;}
137 if (myNID) free(myNID);
138 if (myName)free(myName);
148 const char *hname = lnkp->
Host();
153 {
if (!strcmp(myName,hname) && port == netIF.Port()
154 && netID.Same(lnkp->
NetAddr()))
return;
165 if (theIF && !netIF.InDomain(&netID)) theIF = 0;
166 netIF.SetIF(&netID, theIF, port);
169 netIF.SetPublicName(hname);
173 myName = strdup(hname);
174 myNlen = strlen(hname);
176 if (!port) strcpy(buff, lnkp->
ID);
177 else sprintf(buff,
"%s:%d", lnkp->
ID, port);
179 Ident = strdup(buff);
189 static const int warnIntvl = 60;
190 int totWait = 0, tmoWarn = 60;
191 int tmoWait (
Config.DELDelay < 3 ?
Config.DELDelay : 3);
213 {
if (totWait >=
Config.DELDelay) {doDel =
false;
break;}
215 if (totWait >= tmoWarn)
216 {
unsigned int theCnt = refCnt;
218 tmoWarn += warnIntvl;
228 if (doDel)
delete this;
229 else {
char eBuff[256];
230 snprintf(eBuff,
sizeof(eBuff),
231 " (%p) delete timeout; node object lost!", (
void*)
this);
240void XrdCmsNode::DeleteWarn(
unsigned int lkVal)
247 snprintf(eBuff,
sizeof(eBuff),
"delete sync stall; refs = %u", lkVal);
262 if (needLock) nodeMutex.Lock();
269 {Link->setEtext(reason);
276 if (needLock) nodeMutex.UnLock();
293 DiskUtil =
static_cast<int>(Arg.dskUtil);
320 if (!
Config.DiskOK)
return 0;
321 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
326 else rc =
Config.ossFS->Chmod(Arg.
Path, mode);
330 return (rc ? fsFail(Arg.
Ident,
"chmod", Arg.
Path, rc) : 0);
346 Say.Emsg(
"Node", Link->Name(),
"requested a disconnect");
369 static const SMask_t allNodes(~0);
382 newgone =
Cache.DelFile(Sel,
baseFS.isDFS() ? allNodes : NodeMask);
411 static const SMask_t allNodes(~0);
422 Opts = (
Cache.Paths.Find(Arg.
Path, pinfo) && (pinfo.
rwvec & NodeMask)
431 if (!
Config.asManager()) isnew = 1;
436 isnew =
Cache.AddFile(Sel, allNodes);
437 }
else isnew =
Cache.AddFile(Sel, NodeMask);
462 uint32_t pcpu, pnet, pxeq, pmem, ppag, pdsk;
479 myLoad =
Meter.calcLoad(pcpu, pnet, pxeq, pmem, ppag);
480 myMass =
Meter.calcLoad(myLoad, pdsk);
486 DEBUGR(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
487 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk
488 <<
"% " <<
DiskFree <<
"MB load=" <<myLoad <<
" mass=" <<myMass);
495 {
Meter.Record(pcpu, pnet, pxeq, pmem, ppag, pdsk);
506 if (
Config.LogPerf && !logload)
508 long long tRefs =
Cluster.Refs();
509 long long nRefs =
static_cast<long long>(RefTotW + RefTotR)*100;
510 long long sRefs =
static_cast<long long>(Share) * Shrin * 100;
511 int myShr = (Share ? Share : 100);
512 if (tRefs) {nRefs /= tRefs; sRefs /= tRefs;}
513 else nRefs = sRefs = 0;
514 snprintf(buff,
sizeof(buff)-1,
515 "load=%d; cpu=%d net=%d inq=%d mem=%d pag=%d dsk=%d utl=%d "
516 "shr=[%d %lld %lld] ref=[%d %d]",
517 myLoad, pcpu, pnet, pxeq, pmem, ppag, Arg.
dskFree, pdsk,
518 myShr, nRefs, sRefs, RefTotR+RefR, RefTotW+RefW);
519 Say.Emsg(
"Node",
Name(), buff);
541 struct iovec ioV[2] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
544 char eBuff[128], theopts[8], *toP = theopts;
548 bool lsuniq =
false, oksel =
false, lsall = (*Arg.
Path ==
'*');
563 {lsuniq =
true; *toP++=
'u';}
591 reqInfo.
lsLU =
static_cast<char>(lsopts);
602 if ((rc =
Cluster.Locate(Sel)))
605 bytes =
sizeof(Resp.Val); Why =
"delay ";
610 bytes =
strlcpy(Resp.outbuff,
"No servers have access to the file",
611 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
613 }
else {Why =
"?"; bytes = 0;}
618 {
if (!Sel.
Vec.hf || !(sP=
Cluster.List(Sel.
Vec.hf, lsopts, oksel)))
623 sprintf(eBuff,
"No servers are reachable via %s network",
628 eTxt =
"No servers have the file";
630 bytes =
strlcpy(Resp.outbuff, eTxt,
631 sizeof(Resp.outbuff)) +
sizeof(Resp.Val) + 1;
638 {Resp.Val = htonl(rc);
641 bytes =
do_LocFmt(Resp.outbuff, sP, Sel.
Vec.pf, Sel.
Vec.wf, lsall,lsuniq)
642 +
sizeof(Resp.Val) + 1;
650 ioV[1].iov_len = bytes;
651 Link->Send(ioV, 2, bytes+
sizeof(Arg.
Request));
677 {
if (haverw) pP->
Status |= Skip;
678 else {
if (xP) xP->
Status |= Skip;
680 haverw = (pP->
Mask & wfVec) != 0;
694 if (sP->
Status & Hung) *oP = tolower(*oP);
695 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
697 if (sP->
next) *oP++ =
' ';
698 pP = sP; sP = sP->
next;
delete pP;
702 {
if (!(sP->
Status & Skip))
704 if (sP->
Mask & pfVec) *oP = tolower(*oP);
705 *(oP+1) = (sP->
Mask & wfVec ?
'w' :
'r');
707 if (sP->
next) *oP++ =
' ';
709 pP = sP; sP = sP->
next;
delete pP;
737 if (!
Config.DiskOK)
return 0;
738 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
743 else rc =
Config.ossFS->Mkdir(Arg.
Path, mode);
747 return (rc ? fsFail(Arg.
Ident,
"mkdir", Arg.
Path, rc) : 0);
769 if (!
Config.DiskOK)
return 0;
770 if (!mode && !getMode(Arg.
Mode, mode))
return "invalid mode";
775 else rc =
Config.ossFS->Mkdir(Arg.
Path, mode, 1);
779 return (rc ? fsFail(Arg.
Ident,
"mkpath", Arg.
Path, rc) : 0);
791 static const SMask_t allNodes(~0);
815 if ((rc =
Cluster.Select(Sel2)))
816 {
if (rc > 0) {Arg.waitVal = rc;
return "!mv";}
817 else if (Sel2.
Vec.hf)
818 {
Say.Emsg(
"do_Mv",Arg.
Path2,
"exists; mv failed for",Arg.
Path);
819 return "target file exists";
822 Cache.DelFile(Sel2, allNodes);
823 Cache.DelFile(Sel1, allNodes);
835 return (rc ? fsFail(Arg.
Ident,
"mv", Arg.
Path, rc) : 0);
852 Link->Send((
char *)&pongIt,
sizeof(pongIt));
921 static const SMask_t allNodes(~0);
932 Cache.DelFile(Sel, allNodes);
944 return (rc ? fsFail(Arg.
Ident,
"rm", Arg.
Path, rc) : 0);
956 static const SMask_t allNodes(~0);
967 Cache.DelFile(Sel, allNodes);
979 return (rc ? fsFail(Arg.
Ident,
"rmdir", Arg.
Path, rc) : 0);
987 char *Avoid,
bool &doRedir)
996 do {
if ((Comma = index(Avoid,
','))) *Comma =
'\0';
997 if (*Avoid ==
'+') Sel.
nmask |=
Cluster.getMask(Avoid+1);
998 else if (!avoidAddr.
Set(Avoid,0))
1000 Avoid = Comma+1; avNum++;
1001 }
while(Comma && *Avoid);
1023 strncpy(Sel.
Resp.Data, msrcmsg,
sizeof(Sel.
Resp.Data));
1024 Sel.
Resp.DLen = msrclen;
1032 if (avNum >
Config.MaxRetries)
1039 strncpy(Sel.
Resp.Data, mtrymsg,
sizeof(Sel.
Resp.Data));
1040 Sel.
Resp.DLen = mtrylen;
1064 struct iovec ioV[2];
1065 char theopts[16], *toP = theopts;
1135 bool doRedir =
false;
1142 if (!doRedir && (rc || (rc =
Cluster.Select(Sel))))
1154 Sel.
Resp.Port = rtEC[rtRC];
1157 Sel.
Resp.DLen = sprintf(Sel.
Resp.Data,
"%s",
"Item not found.")+1;
1162 }
else if (!Sel.
Resp.DLen)
return 0;
1165 <<Sel.
Resp.Port <<
" for " <<Arg.
Path);
1170 bytes = Sel.
Resp.DLen+
sizeof(Sel.
Resp.Port);
1172 Sel.
Resp.Port = htonl(Sel.
Resp.Port);
1176 ioV[0].iov_base = (
char *)&Arg.
Request;
1177 ioV[0].iov_len =
sizeof(Arg.
Request);
1178 ioV[1].iov_base = (
char *)&Sel.
Resp;
1179 ioV[1].iov_len = bytes;
1183 Link->Send(ioV, 2, bytes+
sizeof(Arg.
Request));
1226 if (rc > 0) {
Sched->Schedule((
XrdJob *)&Arg, rc+time(0));
1227 DEBUGR(
"coloc to " <<Arg.clPath <<
" delayed " <<rc <<
" seconds");
1230 if (rc < 0)
Say.Emsg(
"SelPrep", Arg.
path,
"failed;", Sel.
Resp.Data);
1231 else Sel.
nmask = ~Scl.smask;
1236 if ((rc =
Cluster.Select(Sel)))
1240 DEBUGR(
"prep delayed " <<rc <<
" seconds");
1243 Say.Emsg(
"SelPrep", Arg.
path,
"failed;", Sel.
Resp.Data);
1244 PrepQ.Inform(
"unavail", &Arg);
1261 struct iovec xmsg[2];
1263 char buff[
sizeof(int)*2+2], *bp = buff;
1264 int blen, maxfr, tutil;
1269 maxfr =
Meter.FreeSpace(tutil);
1273 DEBUGR(maxfr <<
"MB free; " <<tutil <<
"% util");
1279 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1285 else {xmsg[0].iov_base = (
char *)&mySpace;
1286 xmsg[0].iov_len =
sizeof(mySpace);
1287 xmsg[1].iov_base = buff;
1288 xmsg[1].iov_len = blen;
1289 mySpace.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1290 Link->Send(xmsg, 2);
1304 struct iovec xmsg[2];
1320 else if (!
Config.DiskOK && !
Config.asProxy())
return 0;
1323 pinfo.
rovec = NodeMask;
1334 {
TRACER(Files,Arg.
Path <<
" responding have!");
1335 xmsg[0].iov_base = (
char *)&Arg.
Request;
1336 xmsg[0].iov_len =
sizeof(Arg.
Request);
1337 xmsg[1].iov_base = Arg.
Buff;
1338 xmsg[1].iov_len = Arg.
Dlen;
1341 Link->Send(xmsg, 2);
1353 static const SMask_t allNodes(~0);
1361 <<
int(rP->
Mod) <<
" rc=" <<rc <<
" path=" <<rP->
Path);
1362 Sel.Path.Hash = rP->
Sid;
1406 static const SMask_t allNodes(~0);
1414 {
DEBUGR(
"Path find failed for state " <<Arg.
Path);
1420 Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.bf = 0;
1428 if (!retc && !
Config.asServer())
1438 {
if (retc < 0)
return 0;
1441 {
Cache.AddFile(Sel, 0);
1445 if ((retc =
baseFS.Exists(Arg, pinfo)) <= 0)
1446 {
if (retc < 0)
Cache.AddFile(Sel, 0);
1451 Cache.AddFile(Sel, allNodes);
1462 if (!retc || Sel.
Vec.bf != 0)
1463 {
if (!retc)
Cache.AddFile(Sel, 0);
1485 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1486 {(
char *)&Zero,
sizeof(Zero)},
1487 {(
char *)&buff, 0}};
1497 {bytes = sprintf(buff,
"A %lld %lld %d",
1500 : theSpace.
wFree)) + 1;
1502 bytes = sprintf(buff,
"%d %d %d %d %d %d",
1506 }
else bytes =
strlcpy(buff,
"-1 -1 -1 -1 -1 -1",
sizeof(buff)) + 1;
1510 ioV[2].iov_len = bytes;
1511 bytes +=
sizeof(Zero);
1514 Link->Send(ioV, 3, bytes+
sizeof(Arg.
Request));
1526 static const unsigned short szLen =
sizeof(
kXR_unt32);
1528 static int statsz = 0;
1529 static int statln = 0;
1530 static char *statbuff = 0;
1531 static time_t statlast = 0;
1534 struct iovec ioV[3] = {{(
char *)&Arg.
Request,
sizeof(Arg.
Request)},
1535 {(
char *)&theSize,
sizeof(theSize)},
1543 if (!statsz || !statbuff)
1545 statbuff = (
char *)malloc(statsz);
1546 theSize = htonl(statsz);
1552 {ioV[1].iov_len =
sizeof(theSize);
1563 if (statlast+9 >= tNow)
1564 {statln =
Cluster.Stats(statbuff, statsz); statlast = tNow;}
1568 ioV[2].iov_base = statbuff;
1569 ioV[2].iov_len = statln;
1570 Arg.
Request.
datalen = htons(
static_cast<unsigned short>(szLen+statln));
1592 const char *srvMsg, *stgMsg;
1598 int add2Activ, add2Stage, port;
1602 DEBUGR( (Reset ?
"reset " :
"")
1603 <<(Resume ?
"resume " : (Suspend ?
"suspend " :
""))
1604 <<(Stage ?
"stage " : (noStage ?
"nostage " :
"")));
1610 Cache.Bounce(NodeMask, NodeID);
1616 if (noStage) {add2Stage = -1;
isNoStage = 1; stgMsg=
"staging suspended";}
1617 else {add2Stage = 1;
isNoStage = 0; stgMsg=
"staging resumed";}
1618 else {add2Stage = 0; stgMsg = 0;}
1623 if (Suspend) {add2Activ = -1;
1627 srvMsg=
"service suspended";
1630 else {add2Activ = 1;
1634 srvMsg=
"service resumed";
1635 stgMsg = (
isNoStage ?
"(no staging)" :
"(staging)");
1637 if (port && port != netIF.Port())
1639 DEBUGR(
"set data port to " <<port);
1642 else {add2Activ = 0; srvMsg = 0;}
1646 if (
isOffline) {srvMsg =
"service offline"; stgMsg = 0;}
1647 else if (
isBad &
isDisabled) {srvMsg =
"service disabled"; stgMsg = 0;}
1648 else if (
isBad &
isBlisted ) {srvMsg =
"service blacklisted"; stgMsg = 0;}
1652 if (add2Activ || add2Stage)
1654 Say.Emsg(
"Node",
Name(), srvMsg, stgMsg);
1669 long long Size = -1;
1679 if (!
Config.DiskOK)
return 0;
1680 if (Size < 0 && !getSize(Arg.
Mode, Size))
return "invalid size";
1685 else rc =
Config.ossFS->Truncate(Arg.
Path, Size);
1689 return (rc ? fsFail(Arg.
Ident,
"trunc", Arg.
Path, rc) : 0);
1710 if (Manager) Manager->myMans->Add(&netID, Arg.
Path,
Config.PortTCP, myLevel);
1715 return ".redirected";
1756 struct iovec xmsg[2];
1758 char respbuff[
sizeof(loadbuff)+2+
sizeof(
int)+2], *bp = respbuff;
1759 int blen, maxfr, pcpu, pnet, pxeq, pmem, ppag, pdsk;
1763 maxfr =
Meter.Report(pcpu, pnet, pxeq, pmem, ppag, pdsk);
1774 myLoad.Hdr.datalen = htons(
static_cast<unsigned short>(blen));
1776 xmsg[0].iov_base = (
char *)&myLoad;
1777 xmsg[0].iov_len =
sizeof(myLoad);
1778 xmsg[1].iov_base = respbuff;
1779 xmsg[1].iov_len = blen;
1780 if (lp) lp->
Send(xmsg, 2);
1785 DEBUG(
"cpu=" <<pcpu <<
" net=" <<pnet <<
" xeq=" <<pxeq
1786 <<
" mem=" <<pmem <<
" pag=" <<ppag <<
" dsk=" <<pdsk <<
' ' <<maxfr);
1802 {old_free = LastFree; LastFree =
DiskFree;}
1823int XrdCmsNode::fsExec(
XrdOucProg *Prog,
char *Arg1,
char *Arg2)
1826 char Pfn1[PfnSZ], Pfn2[PfnSZ];
1833 {
if (
Config.lcl_N2N->lfn2pfn(Arg1,Pfn1,PfnSZ-1))
return fsL2PFail1;
1844 return Prog->
Run(Arg1, Arg2);
1851const char *XrdCmsNode::fsFail(
const char *Who,
const char *What,
1852 const char *
Path,
int rc)
1859 if (rc == fsL2PFail1) return
"lfn2pfn path1 failed";
1860 if (rc == fsL2PFail2) return
"lfn2pfn path2 failed";
1861 if (rc != ENOENT)
Say.Emsg(
"Node", rc, What,
Path);
1862 else {
struct {
const char *Ident;} Arg = {Who};
1863 DEBUGR(
"rc=" <<rc <<
' ' <<What <<
' ' <<
Path);
1872int XrdCmsNode::getMode(
const char *theMode, mode_t &
Mode)
1878 if (!(
Mode = strtol(theMode, &eP, 8)) || *eP || (
Mode >> 9))
return 0;
1886int XrdCmsNode::getSize(
const char *theSize,
long long &Size)
1892 if (!(Size = strtoll(theSize, &eP, 10)) || *eP)
return 0;
1907 char *slash =
nullptr;
1909 {
if (!(slash = index(spos,
'/')))
return;
1910 acount--; spos = slash+1;
1923 if (Sel.
Path.
Val[i] ==
'/') i--;
1926 {
if (Sel.
Path.
Val[i] ==
'/' && !(++acount))
break;}
unsigned long long SMask_t
#define XrdCmsMAX_PATH_LEN
const char * XrdSysE2T(int errcode)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
static const int EReplete
static const int RetryErr
static const int Wait4CBk
XrdOucName2Name * lcl_N2N
static void Inform(const char *What, const char *Data, int Dlen)
const char * do_PrepDel(XrdCmsRRData &Arg)
int do_StateFWD(XrdCmsRRData &Arg)
const char * do_Gone(XrdCmsRRData &Arg)
const char * do_Locate(XrdCmsRRData &Arg)
const char * do_Update(XrdCmsRRData &Arg)
const char * do_Try(XrdCmsRRData &Arg)
const char * do_State(XrdCmsRRData &Arg)
void Delete(XrdSysRWLock &gMutex)
static void do_StateDFS(XrdCmsBaseFR *rP, int rc)
const char * do_Space(XrdCmsRRData &Arg)
int do_SelAvoid(XrdCmsRRData &Arg, XrdCmsSelect &Sel, char *Avoid, bool &doRedir)
const char * do_Select(XrdCmsRRData &Arg)
const char * do_Mv(XrdCmsRRData &Arg)
const char * do_Trunc(XrdCmsRRData &Arg)
static void Report_Usage(XrdLink *lp)
const char * do_Usage(XrdCmsRRData &Arg)
const char * do_Chmod(XrdCmsRRData &Arg)
static const char isDisabled
const char * do_Load(XrdCmsRRData &Arg)
static int do_SelPrep(XrdCmsPrepArgs &Arg)
const char * do_Rm(XrdCmsRRData &Arg)
const char * do_PrepAdd(XrdCmsRRData &Arg)
const char * do_Ping(XrdCmsRRData &Arg)
const char * do_Have(XrdCmsRRData &Arg)
static const char isSuspend
const char * do_Stats(XrdCmsRRData &Arg)
const char * do_Disc(XrdCmsRRData &Arg)
const char * do_Avail(XrdCmsRRData &Arg)
static int do_LocFmt(char *buff, XrdCmsSelected *sP, SMask_t pf, SMask_t wf, bool lsall=false, bool lsuniq=false)
void Disc(const char *reason=0, int needLock=1)
const char * do_Mkpath(XrdCmsRRData &Arg)
XrdCmsNode(XrdLink *lnkp, const char *theIF=0, const char *sid=0, int port=0, int lvl=0, int id=-1)
const char * do_Pong(XrdCmsRRData &Arg)
void setName(XrdLink *lnkp, const char *theIF, int port)
const char * do_Mkdir(XrdCmsRRData &Arg)
const char * do_StatFS(XrdCmsRRData &Arg)
const char * do_Rmdir(XrdCmsRRData &Arg)
static const char isDoomed
static const char isBlisted
const char * do_Status(XrdCmsRRData &Arg)
struct XrdCmsSelect::@372246154012353073336061155127145056202367365025 Resp
struct XrdCmsSelect::@234326324321301042052133314024110005315140201317 Vec
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
const char * Host() const
const char * Set(const char *hSpec, int pNum=PortInSpec)
static const char * Name(ifType ifT)
ifType
The enum that is used to index into ifData to get appropriate interface.
static void Privatize(ifType &x)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
static int Pack(struct iovec **, const char *, unsigned short &buff)
static void Snooze(int seconds)
static const unsigned char kYR_Version