XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file.
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog () const
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace () const
 
bool is_prefetch_enabled () const
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor.
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
 
static CacheGetInstance ()
 Singleton access.
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
 
static const int optRW = 0x0004
 File is read/write (o/w read/only)
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 164 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 158 of file XrdPfc.cc.

158 :
159 XrdOucCache("pfc"),
160 m_env(env),
161 m_log(logger, "XrdPfc_"),
162 m_trace(new XrdSysTrace("XrdPfc", logger)),
163 m_traceID("Cache"),
164 m_oss(0),
165 m_gstream(0),
166 m_purge_pin(0),
167 m_prefetch_condVar(0),
168 m_prefetch_enabled(false),
169 m_RAM_used(0),
170 m_RAM_write_queue(0),
171 m_RAM_std_size(0),
172 m_isClient(false),
173 m_active_cond(0)
174{
175 // Default log level is Warning.
176 m_trace->What = 2;
177}
XrdOucCache(const char *ctype)

References XrdOucCache::XrdOucCache().

Referenced by CreateInstance(), GetInstance(), and TheOne().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 225 of file XrdPfc.cc.

226{
227 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
228
229 {
230 XrdSysMutexHelper lock(&m_RAM_mutex);
231 m_RAM_write_queue += b->get_size();
232 }
233
234 m_writeQ.condVar.Lock();
235 if (fromRead)
236 m_writeQ.queue.push_back(b);
237 else
238 m_writeQ.queue.push_front(b);
239 m_writeQ.size++;
240 m_writeQ.condVar.Signal();
241 m_writeQ.condVar.UnLock();
242}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
const std::string & GetLocalPath() const

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180{
181 const char* tpfx = "Attach() ";
182
183 if (Options & XrdOucCache::optRW)
184 {
185 TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
186 }
187 else if (Cache::GetInstance().Decide(io))
188 {
189 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
190
191 IO *cio;
192
193 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
194 {
195 cio = new IOFileBlock(io, *this);
196 }
197 else
198 {
199 IOFile *iof = new IOFile(io, *this);
200
201 if ( ! iof->HasFile())
202 {
203 delete iof;
204 // TODO - redirect instead. But this is kind of an awkward place for it.
205 // errno is set during IOFile construction.
206 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
207 return io;
208 }
209
210 cio = iof;
211 }
212
213 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
214 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
215
216 return cio;
217 }
218 else
219 {
220 TRACE(Info, tpfx << "decision decline " << io->Path());
221 }
222 return io;
223}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:217
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:137
bool HasFile() const
Check if File was opened successfully.

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCache::optRW, XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char * from,
const char * str,
long long & val,
long long min,
long long max ) const

Definition at line 104 of file XrdPfcConfiguration.cc.

106{
107 if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
108 return false;
109
110 if (val & 0xFFF) {
111 val &= ~0x0FFF;
112 val += 0x1000;
113 m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
114 }
115
116 return true;
117}
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257

References XrdOuca2x::a2sz().

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 689 of file XrdPfc.cc.

690{
691 XrdSysCondVarHelper lock(&m_active_cond);
692 m_purge_delay_set.clear();
693}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134{ return m_instance->RefConfiguration(); }

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters,
XrdOucEnv * env )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 432 of file XrdPfcConfiguration.cc.

433{
434 // Indicate whether or not we are a client instance
435 const char *theINS = getenv("XRDINSTANCE");
436 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
437
438 // Tell everyone else we are a caching proxy
439 XrdOucEnv::Export("XRDPFC", 1);
440
441 XrdOucEnv emptyEnv;
442 XrdOucEnv *myEnv = env ? env : &emptyEnv;
443
444 XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
445
446 if (! config_filename || ! *config_filename)
447 {
448 TRACE(Error, "Config() configuration file not specified.");
449 return false;
450 }
451
452 int fd;
453 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
454 {
455 TRACE( Error, "Config() can't open configuration file " << config_filename);
456 return false;
457 }
458
459 Config.Attach(fd);
460 static const char *cvec[] = { "*** pfc plugin config:", 0 };
461 Config.Capture(cvec);
462
463 // Obtain OFS configurator for OSS plugin.
464 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
465 &XrdVERSIONINFOVAR(XrdOucGetCache));
466 if (! ofsCfg) return false;
467
468 TmpConfiguration tmpc;
469
470 Configuration &CFG = m_configuration;
471
472 // Adjust default parameters for client/serverless caching
473 if (m_isClient)
474 {
475 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
476 m_configuration.m_wqueue_blocks = 8;
477 m_configuration.m_wqueue_threads = 1;
478 }
479
480 // If network checksum processing is the default, indicate so.
481 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
482
483 // Actual parsing of the config file.
484 bool retval = true, aOK = true;
485 char *var;
486 while ((var = Config.GetMyFirstWord()))
487 {
488 if (! strcmp(var,"pfc.osslib"))
489 {
490 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
491 }
492 else if (! strcmp(var,"pfc.cschk"))
493 {
494 retval = xcschk(Config);
495 }
496 else if (! strcmp(var,"pfc.decisionlib"))
497 {
498 retval = xdlib(Config);
499 }
500 else if (! strcmp(var,"pfc.purgelib"))
501 {
502 retval = xplib(Config);
503 }
504 else if (! strcmp(var,"pfc.trace"))
505 {
506 retval = xtrace(Config);
507 }
508 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
509 {
510 m_configuration.m_allow_xrdpfc_command = true;
511 }
512 else if (! strncmp(var,"pfc.", 4))
513 {
514 retval = ConfigParameters(std::string(var+4), Config, tmpc);
515 }
516
517 if ( ! retval)
518 {
519 TRACE(Error, "Config() error in parsing");
520 aOK = false;
521 }
522 }
523
524 Config.Close();
525
526 // Load OSS plugin.
527 auto orig_runmode = myEnv->Get("oss.runmode");
528 myEnv->Put("oss.runmode", "pfc");
529 if (m_configuration.is_cschk_cache())
530 {
531 char csi_conf[128];
532 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
533 {
534 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
535 } else {
536 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
537 return false;
538 }
539 }
540 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
541 {
542 ofsCfg->Plugin(m_oss);
543 }
544 else
545 {
546 TRACE(Error, "Config() Unable to create an OSS object");
547 return false;
548 }
549 if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
550 else myEnv->Put("oss.runmode", "");
551
552 // Test if OSS is operational, determine optional features.
553 aOK &= test_oss_basics_and_features();
554
555 // sets default value for disk usage
556 XrdOssVSInfo sP;
557 {
558 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
559 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
560 {
561 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
562 return false;
563 }
564 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
565 {
566 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
567 m_configuration.m_meta_space.c_str());
568 return false;
569 }
570 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
571 {
572 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
573 return false;
574 }
575 if (sP.Total < 10ll << 20)
576 {
577 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
578 m_configuration.m_data_space.c_str());
579 return false;
580 }
581
582 m_configuration.m_diskTotalSpace = sP.Total;
583
584 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
585 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
586 {
587 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
588 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
589 aOK = false;
590 }
591 }
592 else aOK = false;
593
594 if ( ! tmpc.m_fileUsageMax.empty())
595 {
596 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
597 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
598 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
599 {
600 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
601 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
602 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
603 {
604 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
605 aOK = false;
606 }
607
608
609 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
610 {
611 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
612 aOK = false;
613 }
614 }
615 else aOK = false;
616 }
617 }
618
619 // sets flush frequency
620 if ( ! tmpc.m_flushRaw.empty())
621 {
622 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
623 {
624 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
625 &m_configuration.m_flushCnt,
626 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
627 {
628 return false;
629 }
630 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
631 }
632 else
633 {
634 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
635 &m_configuration.m_flushCnt, 100, 100000))
636 {
637 return false;
638 }
639 }
640 }
641
642 // set the write mode
643 if ( ! tmpc.m_writemodeRaw.empty())
644 {
645 if (tmpc.m_writemodeRaw == "writethrough")
646 {
647 m_configuration.m_write_through = true;
648 }
649 else if (tmpc.m_writemodeRaw != "off")
650 {
651 m_log.Emsg("ConfigParameters()", "Unknown value for pfc.writemode (valid values are `writethrough` or `off`): %s",
652 tmpc.m_writemodeRaw.c_str());
653 return false;
654 }
655 }
656
657 // get number of available RAM blocks after process configuration
658 if (m_configuration.m_RamAbsAvailable == 0)
659 {
660 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
661 char buff[1024];
662 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
663 m_log.Say("Config info: ", buff);
664 }
665 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
666 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
667
668 // Set tracing to debug if this is set in environment
669 char* cenv = getenv("XRDDEBUG");
670 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
671
672 if (aOK)
673 {
674// 000 001 010
675 const char *csc[] = {"off", "cache nonet", "nocache net notls",
676// 011
677 "cache net notls",
678// 100 101 110
679 "off", "cache nonet", "nocache net tls",
680// 111
681 "cache net tls"};
682 char uvk[32];
683 if (m_configuration.m_cs_UVKeep < 0)
684 strcpy(uvk, "lru");
685 else
686 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
687 float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
688
689 char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
691 snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
692 CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
694 snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
696
697 char buff[8192];
698 int loff = 0;
699 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
700 " pfc.cschk %s uvkeep %s\n"
701 " pfc.blocksize %lldk\n"
702 " pfc.prefetch %d\n"
703 " pfc.urlcgi blocksize %s prefetch %s\n"
704 " pfc.ram %.fg\n"
705 " pfc.writequeue %d %d\n"
706 " # Total available disk: %lld\n"
707 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
708 " pfc.spaces %s %s\n"
709 " pfc.trace %d\n"
710 " pfc.flush %lld\n"
711 " pfc.acchistorysize %d\n"
712 " pfc.onlyIfCachedMinBytes %lld\n"
713 " pfc.onlyIfCachedMinFrac %.2f\n",
714 config_filename,
715 csc[int(m_configuration.m_cs_Chk)], uvk,
716 m_configuration.m_bufferSize >> 10,
717 m_configuration.m_prefetch_max_blocks,
718 urlcgi_blks, urlcgi_npref,
719 ram_gb,
720 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
721 sP.Total,
722 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
723 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
724 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
725 m_configuration.m_data_space.c_str(),
726 m_configuration.m_meta_space.c_str(),
727 m_trace->What,
728 m_configuration.m_flushCnt,
729 m_configuration.m_accHistorySize,
730 m_configuration.m_onlyIfCachedMinSize,
731 m_configuration.m_onlyIfCachedMinFrac);
732
733 if (m_configuration.is_dir_stat_reporting_on())
734 {
735 loff += snprintf(buff + loff, sizeof(buff) - loff,
736 " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
737 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
738 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
739 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
740 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
741 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
742 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
743 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
744 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
745 }
746
747 if (m_configuration.m_hdfsmode)
748 {
749 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
750 }
751 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writemode %s\n", m_configuration.m_write_through ? "writethrough" : "off");
752
753 if (m_configuration.m_username.empty())
754 {
755 char unameBuff[256];
756 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
757 m_configuration.m_username = unameBuff;
758 }
759 else
760 {
761 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
762 }
763
764 m_log.Say(buff);
765
766 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
767 }
768
769 // Derived settings
770 m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
772
773 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
774
775 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
776
777 // Create the ResourceMonitor and get it ready for starting the main thread function.
778 if (aOK)
779 {
780 m_res_mon = new ResourceMonitor(*m_oss);
781 m_res_mon->init_before_main();
782 }
783
784 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
785
786 if (ofsCfg) delete ofsCfg;
787
788 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
789 // Building of xrdpfc_print fails when this is enabled.
790#ifdef XRDPFC_CKSUM_TEST
791 {
792 int xxx = m_configuration.m_cs_Chk;
793
794 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
795 {
796 Info::TestCksumStuff();
797 }
798
799 m_configuration.m_cs_Chk = xxx;
800 }
801#endif
802
803 return aOK;
804}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:188
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:116
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:101
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:117
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:120
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:113
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:115
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:118
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:119
std::string m_writemodeRaw
Definition XrdPfc.hh:148
std::string m_diskUsageLWM
Definition XrdPfc.hh:142
std::string m_diskUsageHWM
Definition XrdPfc.hh:143
std::string m_fileUsageBaseline
Definition XrdPfc.hh:144
std::string m_fileUsageNominal
Definition XrdPfc.hh:145
std::string m_flushRaw
Definition XrdPfc.hh:147
std::string m_fileUsageMax
Definition XrdPfc.hh:146

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::TmpConfiguration::m_writemodeRaw, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1004 of file XrdPfc.cc.

1005{
1006 static const char* tpfx = "ConsiderCached ";
1007
1008 TRACE(Debug, tpfx << curl);
1009
1010 XrdCl::URL url(curl);
1011 std::string f_name = url.GetPath();
1012
1013 File *file = nullptr;
1014 {
1015 XrdSysCondVarHelper lock(&m_active_cond);
1016 auto it = m_active.find(f_name);
1017 if (it != m_active.end()) {
1018 file = it->second;
1019 // If the file-open is in progress, `file` is a nullptr
1020 // so we cannot increase the reference count. For now,
1021 // simply treat it as if the file open doesn't exist instead
1022 // of trying to wait and see if it succeeds.
1023 if (file) {
1024 inc_ref_cnt(file, false, false);
1025 }
1026 }
1027 }
1028 if (file) {
1029 struct stat sbuff;
1030 int res = file->Fstat(sbuff);
1031 dec_ref_cnt(file, false);
1032 if (res)
1033 return res;
1034 // DecideIfConsideredCached() already called in File::Fstat().
1035 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1036 }
1037
1038 struct stat sbuff;
1039 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1040 if (res != XrdOssOK) {
1041 TRACE(Debug, tpfx << curl << " -> " << res);
1042 return res;
1043 }
1044 if (S_ISDIR(sbuff.st_mode))
1045 {
1046 TRACE(Debug, tpfx << curl << " -> EISDIR");
1047 return -EISDIR;
1048 }
1049
1050 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1051 if (file_size < 0) {
1052 TRACE(Debug, tpfx << curl << " -> " << file_size);
1053 return (int) file_size;
1054 }
1055 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1056
1057 return is_cached ? 0 : -EREMOTE;
1058}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:930
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:971
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126{
127 assert (m_instance == 0);
128 m_instance = new Cache(logger, env);
129 return *m_instance;
130}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138{
139 if (! m_decisionpoints.empty())
140 {
141 XrdCl::URL url(io->Path());
142 std::string filename = url.GetPath();
143 std::vector<Decision*>::const_iterator it;
144 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145 {
146 XrdPfc::Decision *d = *it;
147 if (! d) continue;
148 if (! d->Decide(filename, *m_oss))
149 {
150 return false;
151 }
152 }
153 }
154
155 return true;
156}
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 971 of file XrdPfc.cc.

972{
973 if (file_size == 0 || bytes_on_disk >= file_size)
974 return true;
975
976 double frac_on_disk = (double) bytes_on_disk / file_size;
977
978 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
979 {
980 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
981 return true;
982 }
983 else
984 {
985 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
986 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
987 return true;
988 }
989 return false;
990}

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 715 of file XrdPfc.cc.

716{
717 // Can be called with other locks held.
718
719 if ( ! m_prefetch_enabled)
720 {
721 return;
722 }
723
724 m_prefetch_condVar.Lock();
725 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
726 {
727 if (*it == file)
728 {
729 m_prefetchList.erase(it);
730 break;
731 }
732 }
733 m_prefetch_condVar.UnLock();
734}

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 930 of file XrdPfc.cc.

931{
932 if (m_metaXattr) {
933 char pfn[4096];
934 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
935 long long fsize = -1ll;
936 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
937 if (res == sizeof(long long))
938 {
939 return fsize;
940 }
941 else
942 {
943 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
944 }
945 }
946
947 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
948 XrdOucEnv env;
949 long long ret;
950 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
951 if (res < 0) {
952 ret = res;
953 } else {
954 Info info(m_trace, 0);
955 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
956 ret = -EBADF;
957 } else {
958 ret = info.GetFileSize();
959 }
960 infoFile->Close();
961 }
962 delete infoFile;
963 return ret;
964}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52{
53 static const char *top_epfx = "ExecuteCommandUrl ";
54
55 SplitParser cp(command_url, "/");
56
57 std::string token = cp.get_token();
58
59 if (token != "xrdpfc_command")
60 {
61 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62 return;
63 }
64
65 // Get the command
66 token = cp.get_token_as_string();
67
68 auto get_opt = [](SplitParser &sp) -> char {
69 const char *t = sp.get_token();
70 if (t)
71 return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72 else
73 return -1;
74 };
75
76 //================================================================
77 // create_file
78 //================================================================
79
80 if (token == "create_file")
81 {
82 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83 static const char* usage =
84 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85 " Creates a cache file with given parameters. Data in file is random.\n"
86 " Useful for cache purge testing.\n"
87 "Notes:\n"
88 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90 " . -t and -d can be given multiple times to record several accesses.\n"
91 " . Negative arguments given to -t are interpreted as relative to now.\n";
92
93 const Configuration &conf = m_configuration;
94
95 token = cp.get_token_as_string();
96 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97 if (token.empty()) {
98 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99 return;
100 }
101 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102 if ( ! cp.has_reminder()) {
103 TRACE(Error, err_prefix << "Path section must not be empty.");
104 return;
105 }
106
107 long long file_size = ONE_GB;
108 long long block_size = conf.m_bufferSize;
109 int access_time [MAX_ACCESSES];
110 int access_duration[MAX_ACCESSES];
111 int at_count = 0, ad_count = 0;
112
113 time_t time_now = time(0);
114
115 SplitParser ap(token, " ");
116 char theOpt;
117
118 while ((theOpt = get_opt(ap)) != (char) -1)
119 {
120 switch (theOpt)
121 {
122 case 'h': {
123 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124 return;
125 }
126 case 's': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128 &file_size, 0ll, 32 * ONE_GB))
129 return;
130 break;
131 }
132 case 'b': {
133 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134 &block_size, 0ll, 64 * ONE_MB))
135 return;
136 break;
137 }
138 case 't': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140 &access_time[at_count++], INT_MIN, INT_MAX))
141 return;
142 break;
143 }
144 case 'd': {
145 if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146 &access_duration[ad_count++], 0, 24 * 3600))
147 return;
148 break;
149 }
150 default: {
151 TRACE(Error, err_prefix << "Unhandled command argument.");
152 return;
153 }
154 }
155 }
156
157 if (at_count < 1) access_time [at_count++] = time_now - 10;
158 if (ad_count < 1) access_duration[ad_count++] = 10;
159
160 if (at_count != ad_count)
161 {
162 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163 return;
164 }
165
166 std::string file_path (cp.get_reminder_with_delim());
167 std::string cinfo_path(file_path + Info::s_infoExtension);
168
169 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170
171 // Check if cinfo exists ... bail out if it does.
172 {
173 struct stat infoStat;
174 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175 {
176 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177 return;
178 }
179 }
180
181 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182
183 {
184 const char *myUser = conf.m_username.c_str();
185 XrdOucEnv myEnv;
186
187 // Create the data file.
188
189 char size_str[32]; sprintf(size_str, "%lld", file_size);
190 myEnv.Put("oss.asize", size_str);
191 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192 int cret;
193 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194 {
195 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196 return;
197 }
198
199 XrdOssDF *myFile = GetOss()->newFile(myUser);
200 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201 {
202 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203 delete myFile;
204 return;
205 }
206
207 // Create the info file.
208
209 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212 {
213 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214 myFile->Close(); delete myFile;
215 return;
216 }
217
218 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220 {
221 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222 delete myInfoFile;
223 myFile->Close(); delete myFile;
224 return;
225 }
226
227 // Allocate space for the data file.
228
229 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230 {
231 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232 }
233
234 // Fill up cinfo.
235
236 Info myInfo(m_trace, false);
237 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238 myInfo.SetAllBitsSynced();
239
240 for (int i = 0; i < at_count; ++i)
241 {
242 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243
244 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245 }
246
247 myInfo.Write(myInfoFile, cinfo_path.c_str());
248
249 // Fake last modified time to the last access_time
250 {
251 time_t last_detach;
252 myInfo.GetLatestDetachTime(last_detach);
253 struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254
255 futimens(myInfoFile->getFD(), acc_mod_time);
256 }
257
258 myInfoFile->Close(); delete myInfoFile;
259 myFile->Close(); delete myFile;
260
261 struct stat dstat;
262 GetOss()->Stat(file_path.c_str(), &dstat);
263 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264 << "st_blocks=" << dstat.st_blocks);
265
266 {
267 XrdSysCondVarHelper lock(&m_writeQ.condVar);
268
269 m_writeQ.writes_between_purges += file_size;
270 }
271 {
272 int token = m_res_mon->register_file_open(file_path, time_now, false);
273 XrdPfc::Stats stats;
274 stats.m_BytesWritten = file_size;
275 stats.m_StBlocksAdded = dstat.st_blocks;
276 m_res_mon->register_file_update_stats(token, stats);
277 m_res_mon->register_file_close(token, time(0), stats);
278 }
279 }
280 }
281
282 //================================================================
283 // remove_file
284 //================================================================
285
286 else if (token == "remove_file")
287 {
288 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289 static const char* usage =
290 "Usage: remove_file/ [-h] /<path>\n"
291 " Removes given file from the cache unless it is currently open.\n"
292 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293 "Notes:\n"
294 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295
296 token = cp.get_token_as_string();
297 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298 if (token.empty()) {
299 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300 return;
301 }
302 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303 if ( ! cp.has_reminder()) {
304 TRACE(Error, err_prefix << "Path section must not be empty.");
305 return;
306 }
307
308 SplitParser ap(token, " ");
309 char theOpt;
310
311 while ((theOpt = get_opt(ap)) != (char) -1)
312 {
313 switch (theOpt)
314 {
315 case 'h': {
316 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317 return;
318 }
319 default: {
320 TRACE(Error, err_prefix << "Unhandled command argument.");
321 return;
322 }
323 }
324 }
325
326 std::string f_name(cp.get_reminder_with_delim());
327
328 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329
330 int ret = UnlinkFile(f_name, true);
331
332 TRACE(Info, err_prefix << "returned with status " << ret);
333 }
334
335 //================================================================
336 // unknown command
337 //================================================================
338
339 else
340 {
341 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342 }
343}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1196
XrdOss * GetOss() const
Definition XrdPfc.hh:282
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesWritten
number of bytes written to disk
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:89
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:108
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:90
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:88

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, XrdOss::Stat(), stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 546 of file XrdPfc.cc.

547{
548 dec_ref_cnt(f, high_debug);
549}

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 393 of file XrdPfc.cc.

394{
395 // Called from virtual IOFile constructor.
396
397 TRACE(Debug, "GetFile " << path << ", io " << io);
398
399 ActiveMap_i it;
400
401 {
402 XrdSysCondVarHelper lock(&m_active_cond);
403
404 while (true)
405 {
406 it = m_active.find(path);
407
408 // File is not open or being opened. Mark it as being opened and
409 // proceed to opening it outside of while loop.
410 if (it == m_active.end())
411 {
412 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
413 break;
414 }
415
416 if (it->second != 0)
417 {
418 it->second->AddIO(io);
419 inc_ref_cnt(it->second, false, true);
420
421 return it->second;
422 }
423 else
424 {
425 // Wait for some change in m_active, then recheck.
426 m_active_cond.Wait();
427 }
428 }
429 }
430
431 // This is always true, now that IOFileBlock is unsupported.
432 if (filesize == 0)
433 {
434 struct stat st;
435 int res = io->Fstat(st);
436 if (res < 0) {
437 errno = res;
438 TRACE(Error, "GetFile, could not get valid stat");
439 } else if (res > 0) {
440 errno = ENOTSUP;
441 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
442 } else {
443 filesize = st.st_size;
444 }
445 }
446
447 File *file = 0;
448
449 if (filesize >= 0)
450 {
451 file = File::FileOpen(path, off, filesize, io->GetInput());
452 }
453
454 {
455 XrdSysCondVarHelper lock(&m_active_cond);
456
457 if (file)
458 {
459 inc_ref_cnt(file, false, true);
460 it->second = file;
461
462 file->AddIO(io);
463 }
464 else
465 {
466 m_active.erase(it);
467 }
468
469 m_active_cond.Broadcast();
470 }
471
472 return file;
473}
virtual int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, and TRACE.

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 300 of file XrdPfc.hh.

300{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132{ return *m_instance; }

References Cache().

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), Prepare(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 296 of file XrdPfc.hh.

296{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 737 of file XrdPfc.cc.

738{
739 m_prefetch_condVar.Lock();
740 while (m_prefetchList.empty())
741 {
742 m_prefetch_condVar.Wait();
743 }
744
745 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
746
747 size_t l = m_prefetchList.size();
748 int idx = rand() % l;
749 File* f = m_prefetchList[idx];
750
751 m_prefetch_condVar.UnLock();
752 return f;
753}

Referenced by Prefetch().

+ Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 282 of file XrdPfc.hh.

282{ return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin * XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 286 of file XrdPfc.hh.

286{ return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 297 of file XrdPfc.hh.

297{ return m_trace; }

Referenced by XrdPfc::File::GetTrace().

+ Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 309 of file XrdPfc.hh.

309{ return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path) const

Definition at line 681 of file XrdPfc.cc.

682{
683 XrdSysCondVarHelper lock(&m_active_cond);
684
685 return m_active.find(path) != m_active.end() ||
686 m_purge_delay_set.find(path) != m_purge_delay_set.end();
687}

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 798 of file XrdPfc.cc.

800{
801 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
802 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
803 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
804
805 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
806
807 if (buff && blen > 0) buff[0] = 0;
808
809 XrdCl::URL url(curl);
810 std::string f_name = url.GetPath();
811 std::string i_name = f_name + Info::s_infoExtension;
812
813 if (why == ForPath)
814 {
815 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
816 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
817 return ret;
818 }
819
820 {
821 XrdSysCondVarHelper lock(&m_active_cond);
822 m_purge_delay_set.insert(f_name);
823 }
824
825 struct stat sbuff, sbuff2;
826 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
827 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
828 {
829 if (S_ISDIR(sbuff.st_mode))
830 {
831 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
832 return -EISDIR;
833 }
834 else
835 {
836 bool read_ok = false;
837 bool is_complete = false;
838
839 // Lock and check if the file is active. If NOT, keep the lock
840 // and add dummy access after successful reading of info file.
841 // If it IS active, just release the lock, this ongoing access will
842 // assure the file continues to exist.
843
844 // XXXX How can I just loop over the cinfo file when active?
845 // Can I not get is_complete from the existing file?
846 // Do I still want to inject access record?
847 // Oh, it writes only if not active .... still let's try to use existing File.
848
849 m_active_cond.Lock();
850
851 bool is_active = m_active.find(f_name) != m_active.end();
852
853 if (is_active) m_active_cond.UnLock();
854
855 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
856 XrdOucEnv myEnv;
857 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
858 if (res >= 0)
859 {
860 Info info(m_trace, 0);
861 if (info.Read(infoFile, i_name.c_str()))
862 {
863 read_ok = true;
864
865 is_complete = info.IsComplete();
866
867 // Add full-size access if reason is for access.
868 if ( ! is_active && is_complete && why == ForAccess)
869 {
870 info.WriteIOStatSingle(info.GetFileSize());
871 info.Write(infoFile, i_name.c_str());
872 }
873 }
874 infoFile->Close();
875 }
876 delete infoFile;
877
878 if ( ! is_active) m_active_cond.UnLock();
879
880 if (read_ok)
881 {
882 if ((is_complete || why == ForInfo) && buff != 0)
883 {
884 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
885 if (res2 < 0)
886 return res2;
887
888 // Normally, files are owned by us but when direct cache access
889 // is wanted and possible, make sure the file is world readable.
890 if (why == ForAccess)
891 {mode_t mode = (forall ? worldReadable : groupReadable);
892 if (((sbuff.st_mode & worldReadable) != mode)
893 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
894 {is_complete = false;
895 *buff = 0;
896 }
897 }
898 }
899
900 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
901 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
902
903 return is_complete ? 0 : -EREMOTE;
904 }
905 }
906 }
907
908 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
909 return -ENOENT;
910}

References XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, TRACE, XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 756 of file XrdPfc.cc.

757{
758 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
759
760 while (true)
761 {
762 m_RAM_mutex.Lock();
763 bool doPrefetch = (m_RAM_used < limit_RAM);
764 m_RAM_mutex.UnLock();
765
766 if (doPrefetch)
767 {
769 f->Prefetch();
770 }
771 else
772 {
774 }
775 }
776}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:737
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdPfc::File::Prefetch(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char * from,
const char * str,
int & val,
int min,
int max ) const

Definition at line 119 of file XrdPfcConfiguration.cc.

121{
122 if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
123 return false;
124
125 return true;
126}

References XrdOuca2x::a2i().

+ Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1070 of file XrdPfc.cc.

1071{
1072 XrdCl::URL url(curl);
1073 std::string f_name = url.GetPath();
1074 std::string i_name = f_name + Info::s_infoExtension;
1075
1076 // Do not allow write access.
1077 if ((oflags & O_ACCMODE) != O_RDONLY)
1078 {
1079 if (Cache::GetInstance().RefConfiguration().m_write_through)
1080 {
1081 return 0;
1082 }
1083 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1084 return -EROFS;
1085 }
1086
1087 // Intercept xrdpfc_command requests.
1088 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1089 {
1090 // Schedule a job to process command request.
1091 {
1092 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1093
1094 schedP->Schedule(ce);
1095 }
1096
1097 return -EAGAIN;
1098 }
1099
1100 {
1101 XrdSysCondVarHelper lock(&m_active_cond);
1102 m_purge_delay_set.insert(f_name);
1103 }
1104
1105 struct stat sbuff;
1106 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1107 {
1108 TRACE(Dump, "Prepare defer open " << f_name);
1109 return 1;
1110 }
1111 else
1112 {
1113 return 0;
1114 }
1115}
static XrdScheduler * schedP
Definition XrdPfc.hh:304

References GetInstance(), XrdCl::URL::GetPath(), RefConfiguration(), XrdPfc::Info::s_infoExtension, schedP, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 277 of file XrdPfc.cc.

278{
279 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
280
281 while (true)
282 {
283 m_writeQ.condVar.Lock();
284 while (m_writeQ.size == 0)
285 {
286 m_writeQ.condVar.Wait();
287 }
288
289 // MT -- optimize to pop several blocks if they are available (or swap the list).
290 // This makes sense especially for smallish block sizes.
291
292 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
293 long long sum_size = 0;
294
295 for (int bi = 0; bi < n_pushed; ++bi)
296 {
297 Block* block = m_writeQ.queue.front();
298 m_writeQ.queue.pop_front();
299 m_writeQ.writes_between_purges += block->get_size();
300 sum_size += block->get_size();
301
302 blks_to_write[bi] = block;
303
304 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
305 }
306 m_writeQ.size -= n_pushed;
307
308 m_writeQ.condVar.UnLock();
309
310 {
311 XrdSysMutexHelper lock(&m_RAM_mutex);
312 m_RAM_write_queue -= sum_size;
313 }
314
315 for (int bi = 0; bi < n_pushed; ++bi)
316 {
317 Block* block = blks_to_write[bi];
318
319 block->m_file->WriteBlockToDisk(block);
320 }
321 }
322}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 217 of file XrdPfc.hh.

217{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Prepare(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor & XrdPfc::Cache::RefResMon ( )
inline

Definition at line 299 of file XrdPfc.hh.

299{ return *m_res_mon; }

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 699 of file XrdPfc.cc.

700{
701 // Can be called with other locks held.
702
703 if ( ! m_prefetch_enabled)
704 {
705 return;
706 }
707
708 m_prefetch_condVar.Lock();
709 m_prefetchList.push_back(file);
710 m_prefetch_condVar.Signal();
711 m_prefetch_condVar.UnLock();
712}

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 475 of file XrdPfc.cc.

476{
477 // Called from virtual IO::DetachFinalize.
478
479 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
480
481 {
482 XrdSysCondVarHelper lock(&m_active_cond);
483
484 f->RemoveIO(io);
485 }
486 dec_ref_cnt(f, true);
487}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 375 of file XrdPfc.cc.

376{
377 bool std_size = (size == m_configuration.m_bufferSize);
378 {
379 XrdSysMutexHelper lock(&m_RAM_mutex);
380
381 m_RAM_used -= size;
382
383 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
384 {
385 m_RAM_std_blocks.push_back(buf);
386 ++m_RAM_std_size;
387 return;
388 }
389 }
390 free(buf);
391}

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 244 of file XrdPfc.cc.

245{
246 std::list<Block*> removed_blocks;
247 long long sum_size = 0;
248
249 m_writeQ.condVar.Lock();
250 std::list<Block*>::iterator i = m_writeQ.queue.begin();
251 while (i != m_writeQ.queue.end())
252 {
253 if ((*i)->m_file == file)
254 {
255 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
256 std::list<Block*>::iterator j = i++;
257 removed_blocks.push_back(*j);
258 sum_size += (*j)->get_size();
259 m_writeQ.queue.erase(j);
260 --m_writeQ.size;
261 }
262 else
263 {
264 ++i;
265 }
266 }
267 m_writeQ.condVar.UnLock();
268
269 {
270 XrdSysMutexHelper lock(&m_RAM_mutex);
271 m_RAM_write_queue -= sum_size;
272 }
273
274 file->BlocksRemovedFromWriteQ(removed_blocks);
275}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 335 of file XrdPfc.cc.

336{
337 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
338
339 bool std_size = (size == m_configuration.m_bufferSize);
340
341 m_RAM_mutex.Lock();
342
343 long long total = m_RAM_used + size;
344
345 if (total <= m_configuration.m_RamAbsAvailable)
346 {
347 m_RAM_used = total;
348 if (std_size && m_RAM_std_size > 0)
349 {
350 char *buf = m_RAM_std_blocks.back();
351 m_RAM_std_blocks.pop_back();
352 --m_RAM_std_size;
353
354 m_RAM_mutex.UnLock();
355
356 return buf;
357 }
358 else
359 {
360 m_RAM_mutex.UnLock();
361 char *buf;
362 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
363 {
364 // Report out of mem? Probably should report it at least the first time,
365 // then periodically.
366 return 0;
367 }
368 return buf;
369 }
370 }
371 m_RAM_mutex.UnLock();
372 return 0;
373}

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135{ return m_instance->RefResMon(); }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 292 of file XrdPfc.hh.

292{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1125 of file XrdPfc.cc.

1126{
1127 const char *tpfx = "Stat ";
1128
1129 XrdCl::URL url(curl);
1130 std::string f_name = url.GetPath();
1131
1132 File *file = nullptr;
1133 {
1134 XrdSysCondVarHelper lock(&m_active_cond);
1135 auto it = m_active.find(f_name);
1136 if (it != m_active.end()) {
1137 file = it->second;
1138 // If `file` is nullptr, the file-open is in progress; instead
1139 // of waiting for the file-open to finish, simply treat it as if
1140 // the file-open doesn't exist.
1141 if (file) {
1142 inc_ref_cnt(file, false, false);
1143 }
1144 }
1145 }
1146 if (file) {
1147 int res = file->Fstat(sbuff);
1148 dec_ref_cnt(file, false);
1149 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1150 return res;
1151 }
1152
1153 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1154 if (res != XrdOssOK) {
1155 TRACE(Debug, tpfx << curl << " -> " << res);
1156 return 1; // res; -- for only-if-cached
1157 }
1158 if (S_ISDIR(sbuff.st_mode))
1159 {
1160 TRACE(Debug, tpfx << curl << " -> EISDIR");
1161 return -EISDIR;
1162 }
1163
1164 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1165 if (file_size < 0) {
1166 TRACE(Debug, tpfx << curl << " -> " << file_size);
1167 return 1; // (int) file_size; -- for only-if-cached
1168 }
1169 sbuff.st_size = file_size;
1170 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1171 if ( ! is_cached)
1172 sbuff.st_atime = 0;
1173
1174 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1175
1176 return 0;
1177}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133{ return *m_instance; }

References Cache().

Referenced by XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1186 of file XrdPfc.cc.

1187{
1188 XrdCl::URL url(curl);
1189 std::string f_name = url.GetPath();
1190
1191 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1192
1193 return UnlinkFile(f_name, false);
1194}

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1196 of file XrdPfc.cc.

1197{
1198 static const char* trc_pfx = "UnlinkFile ";
1199 ActiveMap_i it;
1200 File *file = 0;
1201 long long st_blocks_to_purge = 0;
1202 {
1203 XrdSysCondVarHelper lock(&m_active_cond);
1204
1205 it = m_active.find(f_name);
1206
1207 if (it != m_active.end())
1208 {
1209 if (fail_if_open)
1210 {
1211 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1212 return -EBUSY;
1213 }
1214
1215 // Null File* in m_active map means an operation is ongoing, probably
1216 // Attach() with possible File::Open(). Ask for retry.
1217 if (it->second == 0)
1218 {
1219 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1220 return -EAGAIN;
1221 }
1222
1223 file = it->second;
1224 st_blocks_to_purge = file->initiate_emergency_shutdown();
1225 it->second = 0;
1226 }
1227 else
1228 {
1229 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1230 }
1231 }
1232
1233 if (file) {
1235 } else {
1236 struct stat f_stat;
1237 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1238 st_blocks_to_purge = f_stat.st_blocks;
1239 }
1240
1241 std::string i_name = f_name + Info::s_infoExtension;
1242
1243 // Unlink file & cinfo
1244 int f_ret = m_oss->Unlink(f_name.c_str());
1245 int i_ret = m_oss->Unlink(i_name.c_str());
1246
1247 if (st_blocks_to_purge)
1248 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1249
1250 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1251
1252 {
1253 XrdSysCondVarHelper lock(&m_active_cond);
1254 m_active.erase(it);
1255 m_active_cond.Broadcast();
1256 }
1257
1258 return std::min(f_ret, i_ret);
1259}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:244
long long initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 247 of file XrdPfc.hh.

247{ return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 915 of file XrdPfc.cc.

916{
917 if (m_metaXattr) {
918 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
919 if (res != 0) {
920 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
921 }
922 }
923}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, TRACE, and XrdSysXAttrActive.

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 324 of file XrdPfc.cc.

325{
326 // Called from ResourceMonitor for an alternative estimation of disk writes.
327 XrdSysCondVarHelper lock(&m_writeQ.condVar);
328 long long ret = m_writeQ.writes_between_purges;
329 m_writeQ.writes_between_purges = 0;
330 return ret;
331}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: