39#if defined(__linux__) || defined(__GNU__)
40#include <netinet/tcp.h>
48#if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49#include <sys/sendfile.h>
116 strcpy(
Lname,
"somewhere");
159 if (nbsz <= 0)
return 0;
161 if ((ulen +
HNlen) >= nbsz) ulen = 0;
162 else {strncpy(nbuf,
ID, ulen);
175 int csec, fd, rc = 0;
197 sendQ->Terminate(
this);
282 if (fd >= 2) {
if (
KeepFD) rc = 0;
283 else rc = (
close(fd) < 0 ? errno : 0);
285 if (rc)
Log.Emsg(
"Link", rc,
"close",
ID);
305 do {rc =
Protocol->Process(
this);}
while (!rc &&
Sched.canStick());
306 else {
Log.Emsg(
"Link",
"Dispatch on closed link",
ID);
316 else if (rc != -EINPROGRESS) doCl =
true;
343 struct pollfd polltab = {
PollInfo.FD, POLLIN|POLLRDNORM, 0};
354 do {retc = poll(&polltab, 1, timeout);}
while(retc < 0 && errno == EINTR);
356 {
if (retc == 0)
return 0;
357 return Log.Emsg(
"Link", -errno,
"poll",
ID);
362 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
369 do {mlen = recv(
LinkInfo.FD, Buff, Blen, MSG_PEEK);}
370 while(mlen < 0 && errno == EINTR);
374 if (mlen >= 0)
return int(mlen);
375 Log.Emsg(
"Link", errno,
"peek on",
ID);
392 do {rlen =
read(
LinkInfo.FD, Buff, Blen);}
while(rlen < 0 && errno == EINTR);
396 if (rlen >= 0)
return int(rlen);
397 if (
LinkInfo.FD >= 0)
Log.Emsg(
"Link", errno,
"receive from",
ID);
406 struct pollfd polltab = {
PollInfo.FD, POLLIN|POLLRDNORM, 0};
407 ssize_t rlen, totlen = 0;
418 {
do {retc = poll(&polltab,1,timeout);}
while(retc < 0 && errno == EINTR);
428 return (
LinkInfo.FD >= 0 ?
Log.Emsg(
"Link",-errno,
"poll",
ID) : -1);
433 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
442 do {rlen = recv(
LinkInfo.FD, Buff, Blen, 0);}
443 while(rlen < 0 && errno == EINTR);
445 {
if (!rlen)
return -ENOMSG;
446 if (
LinkInfo.FD > 0)
Log.Emsg(
"Link", -errno,
"receive from",
ID);
449 totlen += rlen; Blen -= rlen; Buff += rlen;
461 struct pollfd polltab = {
PollInfo.FD, POLLIN|POLLRDNORM, 0};
471 do {retc = poll(&polltab,1,timeout);}
while(retc < 0 && errno == EINTR);
477 return (
LinkInfo.FD >= 0 ?
Log.Emsg(
"Link",-errno,
"poll",
ID) : -1);
482 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
498 int seglen, segcnt =
maxIOV, totlen = 0;
500 for (
int i = 0; i < segcnt; i++) seglen +=
iov[i].iov_len;
501 if ((rlen =
RecvIOV(
iov, segcnt)) < 0)
return rlen;
503 if (rlen < seglen)
break;
506 if (iocnt <=
maxIOV) segcnt = iocnt;
521 struct pollfd polltab = {
PollInfo.FD, POLLIN|POLLRDNORM, 0};
529 {
do {retc = poll(&polltab,1,timeout);}
while(retc < 0 && errno == EINTR);
531 {
if (!retc)
return -ETIMEDOUT;
532 Log.Emsg(
"Link",errno,
"poll",
ID);
535 if (!(polltab.revents & (POLLIN|POLLRDNORM)))
545 do {rlen = recv(
LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
546 while(rlen < 0 && errno == EINTR);
550 if (
int(rlen) == Blen)
return Blen;
551 if (!rlen) {
TRACEI(
DEBUG,
"No RecvAll() data; errno=" <<errno);}
552 else if (rlen > 0)
Log.Emsg(
"RecvAll",
"Premature end from",
ID);
553 else if (
LinkInfo.FD >= 0)
Log.Emsg(
"Link", errno,
"receive from",
ID);
571 while(retc < 0 && errno == EINTR);
575 if (retc < 0)
Log.Emsg(
"Link", errno,
"receive from",
ID);
600 ssize_t retc = 0, bytesleft = Blen;
611 {retc =
sendQ->Send(Buff, Blen);
620 {
if (errno == EINTR)
continue;
623 bytesleft -= retc; Buff += retc;
629 if (retc >= 0)
return Blen;
630 Log.Emsg(
"Link", errno,
"send to",
ID);
649 {retc =
sendQ->Send(
iov, iocnt, bytes);
664 int seglen, segcnt =
maxIOV, iolen = 0;
666 for (
int i = 0; i < segcnt; i++) seglen +=
iov[i].iov_len;
667 if ((retc =
SendIOV(
iov, segcnt, seglen)) < 0)
674 if (iocnt <=
maxIOV) segcnt = iocnt;
687#if !defined(HAVE_SENDFILE)
691#elif defined(__solaris__)
694 size_t xframt, totamt, bytes = 0;
700 for (i = 0; i < sfN; sfP++, i++)
702 {vecSF[i].sfv_fd = SFV_FD_SELF;
703 vecSF[i].sfv_off = (off_t)sfP->buffer;
705 vecSF[i].sfv_fd = sfP->
fdnum;
706 vecSF[i].sfv_off = sfP->offset;
708 vecSF[i].sfv_flag = 0;
709 vecSF[i].sfv_len = sfP->
sendsz;
720do{retc = sendfilev(
LinkInfo.FD, vecSFP, sfN, &xframt);
732 if (retc < 0 && errno != EINTR)
break;
738 while(xframt > 0 && sfN)
739 {
if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
740 {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt;
break;}
741 xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
748 retc = (retc < 0 ? errno : ECANCELED);
750 Log.Emsg(
"Link", retc,
"send file to",
ID);
753#elif defined(__linux__) || defined(__GNU__)
755 static const int setON = 1, setOFF = 0;
756 ssize_t retc = 0, bytesleft;
758 int i, xfrbytes = 0, uncork = 1, xIntr = 0;
768 if (setsockopt(
PollInfo.FD, SOL_TCP, TCP_CORK, &setON,
sizeof(setON)) < 0)
769 {
Log.Emsg(
"Link", errno,
"cork socket for",
ID);
770 uncork = 0;
sfOK = 0;
775 for (i = 0; i < sfN; sfP++, i++)
777 else {myOffset = sfP->offset; bytesleft = sfP->
sendsz;
779 && (retc=sendfile(
LinkInfo.FD,sfP->
fdnum,&myOffset,bytesleft)) > 0)
780 {bytesleft -= retc; xIntr++;}
782 if (retc < 0 && errno == EINTR)
continue;
783 if (retc <= 0)
break;
790 {
if (retc == 0) errno = ECANCELED;
792 Log.Emsg(
"Link", errno,
"send file to",
ID);
799 && setsockopt(
PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF,
sizeof(setOFF)) < 0)
800 Log.Emsg(
"Link", errno,
"uncork socket for",
ID);
804 if (xIntr > sfN)
SfIntr += (xIntr - sfN);
822 ssize_t retc = 0, bytesleft = Blen;
828 {
if (errno == EINTR)
continue;
831 bytesleft -= retc; Buff += retc;
845 ssize_t bytesleft, n, retc = 0;
855 bytesleft =
static_cast<ssize_t
>(bytes);
858 while(retc < 0 && errno == EINTR);
859 if (retc >= bytesleft || retc < 0)
break;
861 while(retc >= (n =
static_cast<ssize_t
>(
iov->iov_len)))
862 {retc -= n;
iov++; iocnt--;}
863 Buff = (
const char *)
iov->iov_base + retc; n -= retc;
iov++; iocnt--;
865 {
if (errno == EINTR)
continue;
868 n -= retc; Buff += retc; bytesleft -= retc;
870 if (retc < 0 || iocnt < 1)
break;
875 if (retc >= 0)
return bytes;
876 Log.Emsg(
"Link", errno,
"send to",
ID);
886 char buff[
sizeof(
Uname)], *bp, *sp;
889 snprintf(buff,
sizeof(buff),
"%s.%d:%d", userid, procid,
PollInfo.FD);
891 sp = buff + ulen - 1;
893 if (ulen > (
int)
sizeof(
Uname)) ulen =
sizeof(
Uname);
895 while(ulen--) {*bp = *sp; bp--; sp--;}
912#if !defined(__linux__)
960 Addr.SetDialect(name);
979 if (
isTLS == enable)
return true;
992 eNote =
tlsIO.Init(*ctx,
PollInfo.FD, rwMode, hsMode,
false,
false,
ID);
998 snprintf(buff,
sizeof(buff),
"Unable to enable tls for %s;",
ID);
999 Log.Emsg(
"LinkXeq", buff, eNote);
1011 else {
isTLS = enable;
1012 Addr.SetTLS(enable);
1013 Log.Emsg(
"LinkXeq",
ID,
"connection upgraded to",
verTLS());
1024 Log.Emsg(
"TLS", rc,
"send file to",
ID);
1038 TRACEI(
DEBUG, (getLock ?
"Async" :
"Sync") <<
" link shutdown in progress");
1042 if (getLock)
LinkInfo.opMutex.Lock();
1051 Log.Emsg(
"Link", errno,
"shutdown FD for",
ID);
1057 if (getLock)
LinkInfo.opMutex.UnLock();
1066 static const char statfmt[] =
"<stats id=\"link\"><num>%d</num>"
1067 "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1068 "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1069 "<sfps>%d</sfps></stats>";
1074 if (!buff)
return sizeof(statfmt)+17*6;
1107 if (!ctime)
LinkInfo.opMutex.Lock();
1120 {*ctime = time(0) -
LinkInfo.conTime;
1150 if (!ctime)
LinkInfo.opMutex.UnLock();
1162 snprintf(msg,
sizeof(msg),
"Unable to %s %s;", act,
ID);
1163 Log.Emsg(
"TLS", msg, reason.c_str());
1186 if (rc < 1)
return rc;
1191 retc =
tlsIO.Peek(Buff, Blen, rlen);
1217 retc =
tlsIO.Read(Buff, Blen, rlen);
1229 int pend, rlen, totlen = 0;
1239 {pend =
tlsIO.Pending(
true);
1242 {
if (pend < 0)
return -1;
1256 retc =
tlsIO.Read(Buff, Blen, rlen);
1258 {
if (!totlen)
return -ENOMSG;
1262 if (rlen <= 0)
break;
1263 totlen += rlen; Blen -= rlen; Buff += rlen;
1276 int Blen, rlen, totlen = 0;
1285 for (
int i = 0; i < iocnt; i++)
1286 {Buff = (
char *)
iov[i].iov_base;
1287 Blen =
iov[i].iov_len;
1288 rlen =
TLS_Recv(Buff, Blen, timeout,
true);
1289 if (rlen <= 0)
break;
1291 if (rlen < Blen)
break;
1310 {retc =
tlsIO.Pending(
true);
1312 if (retc < 1)
return (retc ? -1 : -ETIMEDOUT);
1327 ssize_t bytesleft = Blen;
1343 {retc =
tlsIO.Write(Buff, bytesleft, byteswritten);
1345 bytesleft -= byteswritten; Buff += byteswritten;
1373 for (
int i = 0; i < iocnt; i++)
1374 {ssize_t bytesleft =
iov[i].iov_len;
1375 char *Buff = (
char *)
iov[i].iov_base;
1377 {retc =
tlsIO.Write(Buff, bytesleft, byteswritten);
1379 bytesleft -= byteswritten; Buff += byteswritten;
1393 int bytes, buffsz, fileFD, retc;
1402 for (
int i = 0; i < sfN; sfP++, i++)
1403 {
if (!(bytes = sfP->
sendsz))
continue;
1406 {
if (!
TLS_Write(sfP->buffer, bytes))
return -1;
1409 offset = sfP->offset;
1410 fileFD = sfP->
fdnum;
1411 buffsz = (bytes < (int)
sizeof(myBuff) ? bytes :
sizeof(myBuff));
1412 do {
do {retc =
pread(fileFD, myBuff, buffsz, offset);}
1413 while(retc < 0 && errno == EINTR);
1414 if (retc < 0)
return SFError(errno);
1416 if (!
TLS_Write(myBuff, buffsz))
return -1;
1417 offset += buffsz; bytes -= buffsz; totamt += retc;
1439 {retc =
tlsIO.Write(Buff, Blen, byteswritten);
1444 Blen -= byteswritten; Buff += byteswritten;
1458 return tlsIO.Version();
#define pread(a, b, c, d)
static void SyncAll()
Synchronize statustics for ll links.
static void Unhook(int fd)
Unhook a link from the active table of links.
static const char * TraceID
bool(* CloseRequestCb)(void *)
int TLS_Send(const char *Buff, int Blen)
int TLS_Error(const char *act, XrdTls::RC rc)
int TLS_Peek(char *Buff, int Blen, int timeout)
int Client(char *buff, int blen)
XrdTlsPeerCerts * getPeerCerts()
int Close(bool defer=false)
int TLS_Recv(char *Buff, int Blen)
int sendData(const char *Buff, int Blen)
bool TLS_Write(const char *Buff, int Blen)
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
static long long LinkCountTot
void Shutdown(bool getLock)
int Peek(char *buff, int blen, int timeout=-1)
static int Stats(char *buff, int blen, bool do_sync=false)
void setID(const char *userid, int procid)
int Recv(char *buff, int blen)
static long long LinkBytesIn
int TLS_RecvAll(char *Buff, int Blen, int timeout)
int Send(const char *buff, int blen)
int RecvIOV(const struct iovec *iov, int iocnt)
static long long LinkConTime
int RecvAll(char *buff, int blen, int timeout=-1)
bool Register(const char *hName)
static XrdSysMutex statsMutex
void setProtName(const char *name)
static long long LinkBytesOut
void syncStats(int *ctime=0)
bool setTLS(bool enable, XrdTlsContext *ctx=0)
bool RegisterCloseRequestCb(XrdProtocol *pp, bool(*cb)(void *), void *cbarg)
void Serialize()
Wait for all outstanding requests to be completed on the link.
int Wait4Data(int timeout)
char * ID
Pointer to the client's link identity.
static char * Poll2Text(short events)
static void Detach(XrdPollInfo &pInfo)
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
long long bytesOut
Bytes written to the socket.
int consec
Seconds connected.
long long bytesIn
Bytes read from the socket.
const char * tident
Pointer to the client's trace identifier.
@ TLS_HS_BLOCK
Always block during handshake.
@ TLS_RBL_WBL
blocking read blocking write
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
@ TLS_AOK
All went well, will always be zero.
int fdnum
File descriptor for data.
int sendsz
Length of data at offset.