XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog () const
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace () const
 
bool is_prefetch_enabled () const
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 164 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 158 of file XrdPfc.cc.

158  :
159  XrdOucCache("pfc"),
160  m_env(env),
161  m_log(logger, "XrdPfc_"),
162  m_trace(new XrdSysTrace("XrdPfc", logger)),
163  m_traceID("Cache"),
164  m_oss(0),
165  m_gstream(0),
166  m_purge_pin(0),
167  m_prefetch_condVar(0),
168  m_prefetch_enabled(false),
169  m_RAM_used(0),
170  m_RAM_write_queue(0),
171  m_RAM_std_size(0),
172  m_isClient(false),
173  m_active_cond(0)
174 {
175  // Default log level is Warning.
176  m_trace->What = 2;
177 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:700

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 225 of file XrdPfc.cc.

226 {
227  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
228 
229  {
230  XrdSysMutexHelper lock(&m_RAM_mutex);
231  m_RAM_write_queue += b->get_size();
232  }
233 
234  m_writeQ.condVar.Lock();
235  if (fromRead)
236  m_writeQ.queue.push_back(b);
237  else
238  m_writeQ.queue.push_front(b);
239  m_writeQ.size++;
240  m_writeQ.condVar.Signal();
241  m_writeQ.condVar.UnLock();
242 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:136
File * get_file() const
Definition: XrdPfcFile.hh:140
long long m_offset
Definition: XrdPfcFile.hh:114
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:262

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180 {
181  const char* tpfx = "Attach() ";
182 
183  if (Options & XrdOucCache::optRW)
184  {
185  TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
186  }
187  else if (Cache::GetInstance().Decide(io))
188  {
189  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
190 
191  IO *cio;
192 
193  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
194  {
195  cio = new IOFileBlock(io, *this);
196  }
197  else
198  {
199  IOFile *iof = new IOFile(io, *this);
200 
201  if ( ! iof->HasFile())
202  {
203  delete iof;
204  // TODO - redirect instead. But this is kind of an awkward place for it.
205  // errno is set during IOFile construction.
206  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
207  return io;
208  }
209 
210  cio = iof;
211  }
212 
213  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
214  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
215 
216  return cio;
217  }
218  else
219  {
220  TRACE(Info, tpfx << "decision decline " << io->Path());
221  }
222  return io;
223 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
bool Debug
@ Error
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
static const int optRW
File is read/write (o/w read/only)
Definition: XrdOucCache.hh:517
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:217
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:137
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCache::optRW, XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char *  from,
const char *  str,
long long &  val,
long long  min,
long long  max 
) const

Definition at line 104 of file XrdPfcConfiguration.cc.

106 {
107  if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
108  return false;
109 
110  if (val & 0xFFF) {
111  val &= ~0x0FFF;
112  val += 0x1000;
113  m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
114  }
115 
116  return true;
117 }
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95

References XrdOuca2x::a2sz(), and XrdSysError::Emsg().

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 689 of file XrdPfc.cc.

690 {
691  XrdSysCondVarHelper lock(&m_active_cond);
692  m_purge_delay_set.clear();
693 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters,
XrdOucEnv env 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 432 of file XrdPfcConfiguration.cc.

433 {
434  // Indicate whether or not we are a client instance
435  const char *theINS = getenv("XRDINSTANCE");
436  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
437 
438  // Tell everyone else we are a caching proxy
439  XrdOucEnv::Export("XRDPFC", 1);
440 
441  XrdOucEnv emptyEnv;
442  XrdOucEnv *myEnv = env ? env : &emptyEnv;
443 
444  XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
445 
446  if (! config_filename || ! *config_filename)
447  {
448  TRACE(Error, "Config() configuration file not specified.");
449  return false;
450  }
451 
452  int fd;
453  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
454  {
455  TRACE( Error, "Config() can't open configuration file " << config_filename);
456  return false;
457  }
458 
459  Config.Attach(fd);
460  static const char *cvec[] = { "*** pfc plugin config:", 0 };
461  Config.Capture(cvec);
462 
463  // Obtain OFS configurator for OSS plugin.
464  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
465  &XrdVERSIONINFOVAR(XrdOucGetCache));
466  if (! ofsCfg) return false;
467 
468  TmpConfiguration tmpc;
469 
470  Configuration &CFG = m_configuration;
471 
472  // Adjust default parameters for client/serverless caching
473  if (m_isClient)
474  {
475  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
476  m_configuration.m_wqueue_blocks = 8;
477  m_configuration.m_wqueue_threads = 1;
478  }
479 
480  // If network checksum processing is the default, indicate so.
481  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
482 
483  // Actual parsing of the config file.
484  bool retval = true, aOK = true;
485  char *var;
486  while ((var = Config.GetMyFirstWord()))
487  {
488  if (! strcmp(var,"pfc.osslib"))
489  {
490  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
491  }
492  else if (! strcmp(var,"pfc.cschk"))
493  {
494  retval = xcschk(Config);
495  }
496  else if (! strcmp(var,"pfc.decisionlib"))
497  {
498  retval = xdlib(Config);
499  }
500  else if (! strcmp(var,"pfc.purgelib"))
501  {
502  retval = xplib(Config);
503  }
504  else if (! strcmp(var,"pfc.trace"))
505  {
506  retval = xtrace(Config);
507  }
508  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
509  {
510  m_configuration.m_allow_xrdpfc_command = true;
511  }
512  else if (! strncmp(var,"pfc.", 4))
513  {
514  retval = ConfigParameters(std::string(var+4), Config, tmpc);
515  }
516 
517  if ( ! retval)
518  {
519  TRACE(Error, "Config() error in parsing");
520  aOK = false;
521  }
522  }
523 
524  Config.Close();
525 
526  // Load OSS plugin.
527  auto orig_runmode = myEnv->Get("oss.runmode");
528  myEnv->Put("oss.runmode", "pfc");
529  if (m_configuration.is_cschk_cache())
530  {
531  char csi_conf[128];
532  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
533  {
534  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
535  } else {
536  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
537  return false;
538  }
539  }
540  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
541  {
542  ofsCfg->Plugin(m_oss);
543  }
544  else
545  {
546  TRACE(Error, "Config() Unable to create an OSS object");
547  return false;
548  }
549  if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
550  else myEnv->Put("oss.runmode", "");
551 
552  // Test if OSS is operational, determine optional features.
553  aOK &= test_oss_basics_and_features();
554 
555  // sets default value for disk usage
556  XrdOssVSInfo sP;
557  {
558  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
559  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
560  {
561  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
562  return false;
563  }
564  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
565  {
566  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
567  m_configuration.m_meta_space.c_str());
568  return false;
569  }
570  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
571  {
572  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
573  return false;
574  }
575  if (sP.Total < 10ll << 20)
576  {
577  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
578  m_configuration.m_data_space.c_str());
579  return false;
580  }
581 
582  m_configuration.m_diskTotalSpace = sP.Total;
583 
584  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
585  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
586  {
587  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
588  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
589  aOK = false;
590  }
591  }
592  else aOK = false;
593 
594  if ( ! tmpc.m_fileUsageMax.empty())
595  {
596  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
597  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
598  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
599  {
600  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
601  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
602  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
603  {
604  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
605  aOK = false;
606  }
607 
608 
609  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
610  {
611  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
612  aOK = false;
613  }
614  }
615  else aOK = false;
616  }
617  }
618 
619  // sets flush frequency
620  if ( ! tmpc.m_flushRaw.empty())
621  {
622  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
623  {
624  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
625  &m_configuration.m_flushCnt,
626  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
627  {
628  return false;
629  }
630  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
631  }
632  else
633  {
634  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
635  &m_configuration.m_flushCnt, 100, 100000))
636  {
637  return false;
638  }
639  }
640  }
641 
642  // set the write mode
643  if ( ! tmpc.m_writemodeRaw.empty())
644  {
645  if (tmpc.m_writemodeRaw == "writethrough")
646  {
647  m_configuration.m_write_through = true;
648  }
649  else if (tmpc.m_writemodeRaw != "off")
650  {
651  m_log.Emsg("ConfigParameters()", "Unknown value for pfc.writemode (valid values are `writethrough` or `off`): %s",
652  tmpc.m_writemodeRaw.c_str());
653  return false;
654  }
655  }
656 
657  // get number of available RAM blocks after process configuration
658  if (m_configuration.m_RamAbsAvailable == 0)
659  {
660  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
661  char buff[1024];
662  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
663  m_log.Say("Config info: ", buff);
664  }
665  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
666  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
667 
668  // Set tracing to debug if this is set in environment
669  char* cenv = getenv("XRDDEBUG");
670  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
671 
672  if (aOK)
673  {
674 // 000 001 010
675  const char *csc[] = {"off", "cache nonet", "nocache net notls",
676 // 011
677  "cache net notls",
678 // 100 101 110
679  "off", "cache nonet", "nocache net tls",
680 // 111
681  "cache net tls"};
682  char uvk[32];
683  if (m_configuration.m_cs_UVKeep < 0)
684  strcpy(uvk, "lru");
685  else
686  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
687  float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
688 
689  char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
690  if (CFG.m_cgi_blocksize_allowed)
691  snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
692  CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
693  if (CFG.m_cgi_prefetch_allowed)
694  snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
696 
697  char buff[8192];
698  int loff = 0;
699  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
700  " pfc.cschk %s uvkeep %s\n"
701  " pfc.blocksize %lldk\n"
702  " pfc.prefetch %d\n"
703  " pfc.urlcgi blocksize %s prefetch %s\n"
704  " pfc.ram %.fg\n"
705  " pfc.writequeue %d %d\n"
706  " # Total available disk: %lld\n"
707  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
708  " pfc.spaces %s %s\n"
709  " pfc.trace %d\n"
710  " pfc.flush %lld\n"
711  " pfc.acchistorysize %d\n"
712  " pfc.onlyIfCachedMinBytes %lld\n"
713  " pfc.onlyIfCachedMinFrac %.2f\n",
714  config_filename,
715  csc[int(m_configuration.m_cs_Chk)], uvk,
716  m_configuration.m_bufferSize >> 10,
717  m_configuration.m_prefetch_max_blocks,
718  urlcgi_blks, urlcgi_npref,
719  ram_gb,
720  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
721  sP.Total,
722  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
723  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
724  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
725  m_configuration.m_data_space.c_str(),
726  m_configuration.m_meta_space.c_str(),
727  m_trace->What,
728  m_configuration.m_flushCnt,
729  m_configuration.m_accHistorySize,
730  m_configuration.m_onlyIfCachedMinSize,
731  m_configuration.m_onlyIfCachedMinFrac);
732 
733  if (m_configuration.is_dir_stat_reporting_on())
734  {
735  loff += snprintf(buff + loff, sizeof(buff) - loff,
736  " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
737  m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
738  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
739  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
740  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
741  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
742  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
743  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
744  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
745  }
746 
747  if (m_configuration.m_hdfsmode)
748  {
749  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
750  }
751  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writemode %s\n", m_configuration.m_write_through ? "writethrough" : "off");
752 
753  if (m_configuration.m_username.empty())
754  {
755  char unameBuff[256];
756  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
757  m_configuration.m_username = unameBuff;
758  }
759  else
760  {
761  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
762  }
763 
764  m_log.Say(buff);
765 
766  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
767  }
768 
769  // Derived settings
770  m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
772 
773  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
774 
775  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
776 
777  // Create the ResourceMonitor and get it ready for starting the main thread function.
778  if (aOK)
779  {
780  m_res_mon = new ResourceMonitor(*m_oss);
781  m_res_mon->init_before_main();
782  }
783 
784  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
785 
786  if (ofsCfg) delete ofsCfg;
787 
788  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
789  // Building of xrdpfc_print fails when this is enabled.
790 #ifdef XRDPFC_CKSUM_TEST
791  {
792  int xxx = m_configuration.m_cs_Chk;
793 
794  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
795  {
796  Info::TestCksumStuff();
797  }
798 
799  m_configuration.m_cs_Chk = xxx;
800  }
801 #endif
802 
803  return aOK;
804 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:76
#define open
Definition: XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:122
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:109
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:123
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:116
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:101
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:117
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition: XrdPfc.hh:120
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:112
bool m_write_through
flag indicating write-through mode is enabled
Definition: XrdPfc.hh:84
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:92
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:97
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:95
int m_dirStatsStoreDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:106
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:86
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:94
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:104
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:113
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:127
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:96
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:126
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:85
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:99
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:89
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:103
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:93
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:110
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:108
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:115
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition: XrdPfc.hh:105
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:90
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:111
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:118
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:88
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition: XrdPfc.hh:119
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:130
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:125
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:98
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:129
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
std::string m_writemodeRaw
Definition: XrdPfc.hh:148
std::string m_diskUsageLWM
Definition: XrdPfc.hh:142
std::string m_diskUsageHWM
Definition: XrdPfc.hh:143
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:144
std::string m_fileUsageNominal
Definition: XrdPfc.hh:145
std::string m_flushRaw
Definition: XrdPfc.hh:147
std::string m_fileUsageMax
Definition: XrdPfc.hh:146

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsInterval, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdPfc::Configuration::m_write_through, XrdPfc::TmpConfiguration::m_writemodeRaw, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1004 of file XrdPfc.cc.

1005 {
1006  static const char* tpfx = "ConsiderCached ";
1007 
1008  TRACE(Debug, tpfx << curl);
1009 
1010  XrdCl::URL url(curl);
1011  std::string f_name = url.GetPath();
1012 
1013  File *file = nullptr;
1014  {
1015  XrdSysCondVarHelper lock(&m_active_cond);
1016  auto it = m_active.find(f_name);
1017  if (it != m_active.end()) {
1018  file = it->second;
1019  // If the file-open is in progress, `file` is a nullptr
1020  // so we cannot increase the reference count. For now,
1021  // simply treat it as if the file open doesn't exist instead
1022  // of trying to wait and see if it succeeds.
1023  if (file) {
1024  inc_ref_cnt(file, false, false);
1025  }
1026  }
1027  }
1028  if (file) {
1029  struct stat sbuff;
1030  int res = file->Fstat(sbuff);
1031  dec_ref_cnt(file, false);
1032  if (res)
1033  return res;
1034  // DecideIfConsideredCached() already called in File::Fstat().
1035  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1036  }
1037 
1038  struct stat sbuff;
1039  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1040  if (res != XrdOssOK) {
1041  TRACE(Debug, tpfx << curl << " -> " << res);
1042  return res;
1043  }
1044  if (S_ISDIR(sbuff.st_mode))
1045  {
1046  TRACE(Debug, tpfx << curl << " -> EISDIR");
1047  return -EISDIR;
1048  }
1049 
1050  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1051  if (file_size < 0) {
1052  TRACE(Debug, tpfx << curl << " -> " << file_size);
1053  return (int) file_size;
1054  }
1055  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1056 
1057  return is_cached ? 0 : -EREMOTE;
1058 }
#define XrdOssOK
Definition: XrdOss.hh:50
#define stat(a, b)
Definition: XrdPosix.hh:101
URL representation.
Definition: XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:930
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:971
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:632
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126 {
127  assert (m_instance == 0);
128  m_instance = new Cache(logger, env);
129  return *m_instance;
130 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138 {
139  if (! m_decisionpoints.empty())
140  {
141  XrdCl::URL url(io->Path());
142  std::string filename = url.GetPath();
143  std::vector<Decision*>::const_iterator it;
144  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145  {
146  XrdPfc::Decision *d = *it;
147  if (! d) continue;
148  if (! d->Decide(filename, *m_oss))
149  {
150  return false;
151  }
152  }
153  }
154 
155  return true;
156 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long  file_size,
long long  bytes_on_disk 
)

Definition at line 971 of file XrdPfc.cc.

972 {
973  if (file_size == 0 || bytes_on_disk >= file_size)
974  return true;
975 
976  double frac_on_disk = (double) bytes_on_disk / file_size;
977 
978  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
979  {
980  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
981  return true;
982  }
983  else
984  {
985  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
986  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
987  return true;
988  }
989  return false;
990 }

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 715 of file XrdPfc.cc.

716 {
717  // Can be called with other locks held.
718 
719  if ( ! m_prefetch_enabled)
720  {
721  return;
722  }
723 
724  m_prefetch_condVar.Lock();
725  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
726  {
727  if (*it == file)
728  {
729  m_prefetchList.erase(it);
730  break;
731  }
732  }
733  m_prefetch_condVar.UnLock();
734 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string &  cinfo_fname)

Definition at line 930 of file XrdPfc.cc.

931 {
932  if (m_metaXattr) {
933  char pfn[4096];
934  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
935  long long fsize = -1ll;
936  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
937  if (res == sizeof(long long))
938  {
939  return fsize;
940  }
941  else
942  {
943  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
944  }
945  }
946 
947  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
948  XrdOucEnv env;
949  long long ret;
950  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
951  if (res < 0) {
952  ret = res;
953  } else {
954  Info info(m_trace, 0);
955  if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
956  ret = -EBADF;
957  } else {
958  ret = info.GetFileSize();
959  }
960  infoFile->Close();
961  }
962  delete infoFile;
963  return ret;
964 }
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52 {
53  static const char *top_epfx = "ExecuteCommandUrl ";
54 
55  SplitParser cp(command_url, "/");
56 
57  std::string token = cp.get_token();
58 
59  if (token != "xrdpfc_command")
60  {
61  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62  return;
63  }
64 
65  // Get the command
66  token = cp.get_token_as_string();
67 
68  auto get_opt = [](SplitParser &sp) -> char {
69  const char *t = sp.get_token();
70  if (t)
71  return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72  else
73  return -1;
74  };
75 
76  //================================================================
77  // create_file
78  //================================================================
79 
80  if (token == "create_file")
81  {
82  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83  static const char* usage =
84  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85  " Creates a cache file with given parameters. Data in file is random.\n"
86  " Useful for cache purge testing.\n"
87  "Notes:\n"
88  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90  " . -t and -d can be given multiple times to record several accesses.\n"
91  " . Negative arguments given to -t are interpreted as relative to now.\n";
92 
93  const Configuration &conf = m_configuration;
94 
95  token = cp.get_token_as_string();
96  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97  if (token.empty()) {
98  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99  return;
100  }
101  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102  if ( ! cp.has_reminder()) {
103  TRACE(Error, err_prefix << "Path section must not be empty.");
104  return;
105  }
106 
107  long long file_size = ONE_GB;
108  long long block_size = conf.m_bufferSize;
109  int access_time [MAX_ACCESSES];
110  int access_duration[MAX_ACCESSES];
111  int at_count = 0, ad_count = 0;
112 
113  time_t time_now = time(0);
114 
115  SplitParser ap(token, " ");
116  char theOpt;
117 
118  while ((theOpt = get_opt(ap)) != (char) -1)
119  {
120  switch (theOpt)
121  {
122  case 'h': {
123  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124  return;
125  }
126  case 's': {
127  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128  &file_size, 0ll, 32 * ONE_GB))
129  return;
130  break;
131  }
132  case 'b': {
133  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134  &block_size, 0ll, 64 * ONE_MB))
135  return;
136  break;
137  }
138  case 't': {
139  if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140  &access_time[at_count++], INT_MIN, INT_MAX))
141  return;
142  break;
143  }
144  case 'd': {
145  if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146  &access_duration[ad_count++], 0, 24 * 3600))
147  return;
148  break;
149  }
150  default: {
151  TRACE(Error, err_prefix << "Unhandled command argument.");
152  return;
153  }
154  }
155  }
156 
157  if (at_count < 1) access_time [at_count++] = time_now - 10;
158  if (ad_count < 1) access_duration[ad_count++] = 10;
159 
160  if (at_count != ad_count)
161  {
162  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163  return;
164  }
165 
166  std::string file_path (cp.get_reminder_with_delim());
167  std::string cinfo_path(file_path + Info::s_infoExtension);
168 
169  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170 
171  // Check if cinfo exists ... bail out if it does.
172  {
173  struct stat infoStat;
174  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175  {
176  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177  return;
178  }
179  }
180 
181  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182 
183  {
184  const char *myUser = conf.m_username.c_str();
185  XrdOucEnv myEnv;
186 
187  // Create the data file.
188 
189  char size_str[32]; sprintf(size_str, "%lld", file_size);
190  myEnv.Put("oss.asize", size_str);
191  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192  int cret;
193  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194  {
195  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196  return;
197  }
198 
199  XrdOssDF *myFile = GetOss()->newFile(myUser);
200  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201  {
202  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203  delete myFile;
204  return;
205  }
206 
207  // Create the info file.
208 
209  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212  {
213  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214  myFile->Close(); delete myFile;
215  return;
216  }
217 
218  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220  {
221  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222  delete myInfoFile;
223  myFile->Close(); delete myFile;
224  return;
225  }
226 
227  // Allocate space for the data file.
228 
229  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230  {
231  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232  }
233 
234  // Fill up cinfo.
235 
236  Info myInfo(m_trace, false);
237  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238  myInfo.SetAllBitsSynced();
239 
240  for (int i = 0; i < at_count; ++i)
241  {
242  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243 
244  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245  }
246 
247  myInfo.Write(myInfoFile, cinfo_path.c_str());
248 
249  // Fake last modified time to the last access_time
250  {
251  time_t last_detach;
252  myInfo.GetLatestDetachTime(last_detach);
253  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254 
255  futimens(myInfoFile->getFD(), acc_mod_time);
256  }
257 
258  myInfoFile->Close(); delete myInfoFile;
259  myFile->Close(); delete myFile;
260 
261  struct stat dstat;
262  GetOss()->Stat(file_path.c_str(), &dstat);
263  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264  << "st_blocks=" << dstat.st_blocks);
265 
266  {
267  XrdSysCondVarHelper lock(&m_writeQ.condVar);
268 
269  m_writeQ.writes_between_purges += file_size;
270  }
271  {
272  int token = m_res_mon->register_file_open(file_path, time_now, false);
273  XrdPfc::Stats stats;
274  stats.m_BytesWritten = file_size;
275  stats.m_StBlocksAdded = dstat.st_blocks;
276  m_res_mon->register_file_update_stats(token, stats);
277  m_res_mon->register_file_close(token, time(0), stats);
278  }
279  }
280  }
281 
282  //================================================================
283  // remove_file
284  //================================================================
285 
286  else if (token == "remove_file")
287  {
288  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289  static const char* usage =
290  "Usage: remove_file/ [-h] /<path>\n"
291  " Removes given file from the cache unless it is currently open.\n"
292  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293  "Notes:\n"
294  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295 
296  token = cp.get_token_as_string();
297  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298  if (token.empty()) {
299  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300  return;
301  }
302  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303  if ( ! cp.has_reminder()) {
304  TRACE(Error, err_prefix << "Path section must not be empty.");
305  return;
306  }
307 
308  SplitParser ap(token, " ");
309  char theOpt;
310 
311  while ((theOpt = get_opt(ap)) != (char) -1)
312  {
313  switch (theOpt)
314  {
315  case 'h': {
316  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317  return;
318  }
319  default: {
320  TRACE(Error, err_prefix << "Unhandled command argument.");
321  return;
322  }
323  }
324  }
325 
326  std::string f_name(cp.get_reminder_with_delim());
327 
328  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329 
330  int ret = UnlinkFile(f_name, true);
331 
332  TRACE(Info, err_prefix << "returned with status " << ret);
333  }
334 
335  //================================================================
336  // unknown command
337  //================================================================
338 
339  else
340  {
341  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342  }
343 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
bool Create
virtual int getFD()
Definition: XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:282
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1125
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1196
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat, XrdOss::Stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 546 of file XrdPfc.cc.

547 {
548  dec_ref_cnt(f, high_debug);
549 }

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 393 of file XrdPfc.cc.

394 {
395  // Called from virtual IOFile constructor.
396 
397  TRACE(Debug, "GetFile " << path << ", io " << io);
398 
399  ActiveMap_i it;
400 
401  {
402  XrdSysCondVarHelper lock(&m_active_cond);
403 
404  while (true)
405  {
406  it = m_active.find(path);
407 
408  // File is not open or being opened. Mark it as being opened and
409  // proceed to opening it outside of while loop.
410  if (it == m_active.end())
411  {
412  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
413  break;
414  }
415 
416  if (it->second != 0)
417  {
418  it->second->AddIO(io);
419  inc_ref_cnt(it->second, false, true);
420 
421  return it->second;
422  }
423  else
424  {
425  // Wait for some change in m_active, then recheck.
426  m_active_cond.Wait();
427  }
428  }
429  }
430 
431  // This is always true, now that IOFileBlock is unsupported.
432  if (filesize == 0)
433  {
434  struct stat st;
435  int res = io->Fstat(st);
436  if (res < 0) {
437  errno = res;
438  TRACE(Error, "GetFile, could not get valid stat");
439  } else if (res > 0) {
440  errno = ENOTSUP;
441  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
442  } else {
443  filesize = st.st_size;
444  }
445  }
446 
447  File *file = 0;
448 
449  if (filesize >= 0)
450  {
451  file = File::FileOpen(path, off, filesize, io->GetInput());
452  }
453 
454  {
455  XrdSysCondVarHelper lock(&m_active_cond);
456 
457  if (file)
458  {
459  inc_ref_cnt(file, false, true);
460  it->second = file;
461 
462  file->AddIO(io);
463  }
464  else
465  {
466  m_active.erase(it);
467  }
468 
469  m_active_cond.Broadcast();
470  }
471 
472  return file;
473 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
void AddIO(IO *io)
Definition: XrdPfcFile.cc:349
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:141
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 300 of file XrdPfc.hh.

300 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), Prepare(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 296 of file XrdPfc.hh.

296 { return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 737 of file XrdPfc.cc.

738 {
739  m_prefetch_condVar.Lock();
740  while (m_prefetchList.empty())
741  {
742  m_prefetch_condVar.Wait();
743  }
744 
745  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
746 
747  size_t l = m_prefetchList.size();
748  int idx = rand() % l;
749  File* f = m_prefetchList[idx];
750 
751  m_prefetch_condVar.UnLock();
752  return f;
753 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 282 of file XrdPfc.hh.

282 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin* XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 286 of file XrdPfc.hh.

286 { return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 297 of file XrdPfc.hh.

297 { return m_trace; }

Referenced by XrdPfc::IO::GetTrace(), and XrdPfc::File::GetTrace().

+ Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 309 of file XrdPfc.hh.

309 { return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path) const

Definition at line 681 of file XrdPfc.cc.

682 {
683  XrdSysCondVarHelper lock(&m_active_cond);
684 
685  return m_active.find(path) != m_active.end() ||
686  m_purge_delay_set.find(path) != m_purge_delay_set.end();
687 }

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 798 of file XrdPfc.cc.

800 {
801  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
802  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
803  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
804 
805  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
806 
807  if (buff && blen > 0) buff[0] = 0;
808 
809  XrdCl::URL url(curl);
810  std::string f_name = url.GetPath();
811  std::string i_name = f_name + Info::s_infoExtension;
812 
813  if (why == ForPath)
814  {
815  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
816  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
817  return ret;
818  }
819 
820  {
821  XrdSysCondVarHelper lock(&m_active_cond);
822  m_purge_delay_set.insert(f_name);
823  }
824 
825  struct stat sbuff, sbuff2;
826  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
827  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
828  {
829  if (S_ISDIR(sbuff.st_mode))
830  {
831  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
832  return -EISDIR;
833  }
834  else
835  {
836  bool read_ok = false;
837  bool is_complete = false;
838 
839  // Lock and check if the file is active. If NOT, keep the lock
840  // and add dummy access after successful reading of info file.
841  // If it IS active, just release the lock, this ongoing access will
842  // assure the file continues to exist.
843 
844  // XXXX How can I just loop over the cinfo file when active?
845  // Can I not get is_complete from the existing file?
846  // Do I still want to inject access record?
847  // Oh, it writes only if not active .... still let's try to use existing File.
848 
849  m_active_cond.Lock();
850 
851  bool is_active = m_active.find(f_name) != m_active.end();
852 
853  if (is_active) m_active_cond.UnLock();
854 
855  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
856  XrdOucEnv myEnv;
857  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
858  if (res >= 0)
859  {
860  Info info(m_trace, 0);
861  if (info.Read(infoFile, i_name.c_str()))
862  {
863  read_ok = true;
864 
865  is_complete = info.IsComplete();
866 
867  // Add full-size access if reason is for access.
868  if ( ! is_active && is_complete && why == ForAccess)
869  {
870  info.WriteIOStatSingle(info.GetFileSize());
871  info.Write(infoFile, i_name.c_str());
872  }
873  }
874  infoFile->Close();
875  }
876  delete infoFile;
877 
878  if ( ! is_active) m_active_cond.UnLock();
879 
880  if (read_ok)
881  {
882  if ((is_complete || why == ForInfo) && buff != 0)
883  {
884  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
885  if (res2 < 0)
886  return res2;
887 
888  // Normally, files are owned by us but when direct cache access
889  // is wanted and possible, make sure the file is world readable.
890  if (why == ForAccess)
891  {mode_t mode = (forall ? worldReadable : groupReadable);
892  if (((sbuff.st_mode & worldReadable) != mode)
893  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
894  {is_complete = false;
895  *buff = 0;
896  }
897  }
898  }
899 
900  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
901  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
902 
903  return is_complete ? 0 : -EREMOTE;
904  }
905  }
906  }
907 
908  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
909  return -ENOENT;
910 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 756 of file XrdPfc.cc.

757 {
758  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
759 
760  while (true)
761  {
762  m_RAM_mutex.Lock();
763  bool doPrefetch = (m_RAM_used < limit_RAM);
764  m_RAM_mutex.UnLock();
765 
766  if (doPrefetch)
767  {
769  f->Prefetch();
770  }
771  else
772  {
774  }
775  }
776 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:737
void Prefetch()
Definition: XrdPfcFile.cc:1624
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char *  from,
const char *  str,
int &  val,
int  min,
int  max 
) const

Definition at line 119 of file XrdPfcConfiguration.cc.

121 {
122  if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
123  return false;
124 
125  return true;
126 }

References XrdOuca2x::a2i().

+ Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1070 of file XrdPfc.cc.

1071 {
1072  XrdCl::URL url(curl);
1073  std::string f_name = url.GetPath();
1074  std::string i_name = f_name + Info::s_infoExtension;
1075 
1076  // Do not allow write access.
1077  if ((oflags & O_ACCMODE) != O_RDONLY)
1078  {
1079  if (Cache::GetInstance().RefConfiguration().m_write_through)
1080  {
1081  return 0;
1082  }
1083  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1084  return -EROFS;
1085  }
1086 
1087  // Intercept xrdpfc_command requests.
1088  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1089  {
1090  // Schedule a job to process command request.
1091  {
1092  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1093 
1094  schedP->Schedule(ce);
1095  }
1096 
1097  return -EAGAIN;
1098  }
1099 
1100  {
1101  XrdSysCondVarHelper lock(&m_active_cond);
1102  m_purge_delay_set.insert(f_name);
1103  }
1104 
1105  struct stat sbuff;
1106  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1107  {
1108  TRACE(Dump, "Prepare defer open " << f_name);
1109  return 1;
1110  }
1111  else
1112  {
1113  return 0;
1114  }
1115 }
static XrdScheduler * schedP
Definition: XrdPfc.hh:304
void Schedule(XrdJob *jp)
@ Warning

References GetInstance(), XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, RefConfiguration(), XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat, XrdOss::Stat(), TRACE, TPC::Warning, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 277 of file XrdPfc.cc.

278 {
279  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
280 
281  while (true)
282  {
283  m_writeQ.condVar.Lock();
284  while (m_writeQ.size == 0)
285  {
286  m_writeQ.condVar.Wait();
287  }
288 
289  // MT -- optimize to pop several blocks if they are available (or swap the list).
290  // This makes sense especially for smallish block sizes.
291 
292  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
293  long long sum_size = 0;
294 
295  for (int bi = 0; bi < n_pushed; ++bi)
296  {
297  Block* block = m_writeQ.queue.front();
298  m_writeQ.queue.pop_front();
299  m_writeQ.writes_between_purges += block->get_size();
300  sum_size += block->get_size();
301 
302  blks_to_write[bi] = block;
303 
304  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
305  }
306  m_writeQ.size -= n_pushed;
307 
308  m_writeQ.condVar.UnLock();
309 
310  {
311  XrdSysMutexHelper lock(&m_RAM_mutex);
312  m_RAM_write_queue -= sum_size;
313  }
314 
315  for (int bi = 0; bi < n_pushed; ++bi)
316  {
317  Block* block = blks_to_write[bi];
318 
319  block->m_file->WriteBlockToDisk(block);
320  }
321  }
322 }
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1609
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1137

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 217 of file XrdPfc.hh.

217 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), Prepare(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor& XrdPfc::Cache::RefResMon ( )
inline

Definition at line 299 of file XrdPfc.hh.

299 { return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 699 of file XrdPfc.cc.

700 {
701  // Can be called with other locks held.
702 
703  if ( ! m_prefetch_enabled)
704  {
705  return;
706  }
707 
708  m_prefetch_condVar.Lock();
709  m_prefetchList.push_back(file);
710  m_prefetch_condVar.Signal();
711  m_prefetch_condVar.UnLock();
712 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 475 of file XrdPfc.cc.

476 {
477  // Called from virtual IO::DetachFinalize.
478 
479  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
480 
481  {
482  XrdSysCondVarHelper lock(&m_active_cond);
483 
484  f->RemoveIO(io);
485  }
486  dec_ref_cnt(f, true);
487 }
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:386

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 375 of file XrdPfc.cc.

376 {
377  bool std_size = (size == m_configuration.m_bufferSize);
378  {
379  XrdSysMutexHelper lock(&m_RAM_mutex);
380 
381  m_RAM_used -= size;
382 
383  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
384  {
385  m_RAM_std_blocks.push_back(buf);
386  ++m_RAM_std_size;
387  return;
388  }
389  }
390  free(buf);
391 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 244 of file XrdPfc.cc.

245 {
246  std::list<Block*> removed_blocks;
247  long long sum_size = 0;
248 
249  m_writeQ.condVar.Lock();
250  std::list<Block*>::iterator i = m_writeQ.queue.begin();
251  while (i != m_writeQ.queue.end())
252  {
253  if ((*i)->m_file == file)
254  {
255  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
256  std::list<Block*>::iterator j = i++;
257  removed_blocks.push_back(*j);
258  sum_size += (*j)->get_size();
259  m_writeQ.queue.erase(j);
260  --m_writeQ.size;
261  }
262  else
263  {
264  ++i;
265  }
266  }
267  m_writeQ.condVar.UnLock();
268 
269  {
270  XrdSysMutexHelper lock(&m_RAM_mutex);
271  m_RAM_write_queue -= sum_size;
272  }
273 
274  file->BlocksRemovedFromWriteQ(removed_blocks);
275 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 335 of file XrdPfc.cc.

336 {
337  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
338 
339  bool std_size = (size == m_configuration.m_bufferSize);
340 
341  m_RAM_mutex.Lock();
342 
343  long long total = m_RAM_used + size;
344 
345  if (total <= m_configuration.m_RamAbsAvailable)
346  {
347  m_RAM_used = total;
348  if (std_size && m_RAM_std_size > 0)
349  {
350  char *buf = m_RAM_std_blocks.back();
351  m_RAM_std_blocks.pop_back();
352  --m_RAM_std_size;
353 
354  m_RAM_mutex.UnLock();
355 
356  return buf;
357  }
358  else
359  {
360  m_RAM_mutex.UnLock();
361  char *buf;
362  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
363  {
364  // Report out of mem? Probably should report it at least the first time,
365  // then periodically.
366  return 0;
367  }
368  return buf;
369  }
370  }
371  m_RAM_mutex.UnLock();
372  return 0;
373 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135 { return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:299

References RefResMon().

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 292 of file XrdPfc.hh.

292 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1125 of file XrdPfc.cc.

1126 {
1127  const char *tpfx = "Stat ";
1128 
1129  XrdCl::URL url(curl);
1130  std::string f_name = url.GetPath();
1131 
1132  File *file = nullptr;
1133  {
1134  XrdSysCondVarHelper lock(&m_active_cond);
1135  auto it = m_active.find(f_name);
1136  if (it != m_active.end()) {
1137  file = it->second;
1138  // If `file` is nullptr, the file-open is in progress; instead
1139  // of waiting for the file-open to finish, simply treat it as if
1140  // the file-open doesn't exist.
1141  if (file) {
1142  inc_ref_cnt(file, false, false);
1143  }
1144  }
1145  }
1146  if (file) {
1147  int res = file->Fstat(sbuff);
1148  dec_ref_cnt(file, false);
1149  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1150  return res;
1151  }
1152 
1153  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1154  if (res != XrdOssOK) {
1155  TRACE(Debug, tpfx << curl << " -> " << res);
1156  return 1; // res; -- for only-if-cached
1157  }
1158  if (S_ISDIR(sbuff.st_mode))
1159  {
1160  TRACE(Debug, tpfx << curl << " -> EISDIR");
1161  return -EISDIR;
1162  }
1163 
1164  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1165  if (file_size < 0) {
1166  TRACE(Debug, tpfx << curl << " -> " << file_size);
1167  return 1; // (int) file_size; -- for only-if-cached
1168  }
1169  sbuff.st_size = file_size;
1170  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1171  if ( ! is_cached)
1172  sbuff.st_atime = 0;
1173 
1174  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1175 
1176  return 0;
1177 }

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133 { return *m_instance; }

Referenced by XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1186 of file XrdPfc.cc.

1187 {
1188  XrdCl::URL url(curl);
1189  std::string f_name = url.GetPath();
1190 
1191  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1192 
1193  return UnlinkFile(f_name, false);
1194 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1196 of file XrdPfc.cc.

1197 {
1198  static const char* trc_pfx = "UnlinkFile ";
1199  ActiveMap_i it;
1200  File *file = 0;
1201  long long st_blocks_to_purge = 0;
1202  {
1203  XrdSysCondVarHelper lock(&m_active_cond);
1204 
1205  it = m_active.find(f_name);
1206 
1207  if (it != m_active.end())
1208  {
1209  if (fail_if_open)
1210  {
1211  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1212  return -EBUSY;
1213  }
1214 
1215  // Null File* in m_active map means an operation is ongoing, probably
1216  // Attach() with possible File::Open(). Ask for retry.
1217  if (it->second == 0)
1218  {
1219  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1220  return -EAGAIN;
1221  }
1222 
1223  file = it->second;
1224  st_blocks_to_purge = file->initiate_emergency_shutdown();
1225  it->second = 0;
1226  }
1227  else
1228  {
1229  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1230  }
1231  }
1232 
1233  if (file) {
1234  RemoveWriteQEntriesFor(file);
1235  } else {
1236  struct stat f_stat;
1237  if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1238  st_blocks_to_purge = f_stat.st_blocks;
1239  }
1240 
1241  std::string i_name = f_name + Info::s_infoExtension;
1242 
1243  // Unlink file & cinfo
1244  int f_ret = m_oss->Unlink(f_name.c_str());
1245  int i_ret = m_oss->Unlink(i_name.c_str());
1246 
1247  if (st_blocks_to_purge)
1248  m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1249 
1250  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1251 
1252  {
1253  XrdSysCondVarHelper lock(&m_active_cond);
1254  m_active.erase(it);
1255  m_active_cond.Broadcast();
1256  }
1257 
1258  return std::min(f_ret, i_ret);
1259 }
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:244
long long initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:154
void register_file_purge(DirState *target, long long size_in_st_blocks)

References XrdSysCondVar::Broadcast(), Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 247 of file XrdPfc.hh.

247 { return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int  cinfo_fd,
long long  file_size 
)

Definition at line 915 of file XrdPfc.cc.

916 {
917  if (m_metaXattr) {
918  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
919  if (res != 0) {
920  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
921  }
922  }
923 }
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 324 of file XrdPfc.cc.

325 {
326  // Called from ResourceMonitor for an alternative estimation of disk writes.
327  XrdSysCondVarHelper lock(&m_writeQ.condVar);
328  long long ret = m_writeQ.writes_between_purges;
329  m_writeQ.writes_between_purges = 0;
330  return ret;
331 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: