XRootD
Loading...
Searching...
No Matches
XrdPfc.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <fcntl.h>
20#include <sstream>
21#include <algorithm>
22#include <sys/statvfs.h>
23
24#include "XrdCl/XrdClURL.hh"
25
26#include "XrdOuc/XrdOucEnv.hh"
27#include "XrdOuc/XrdOucUtils.hh"
29
30#include "XrdSys/XrdSysTimer.hh"
31#include "XrdSys/XrdSysTrace.hh"
32#include "XrdSys/XrdSysXAttr.hh"
33
35
36#include "XrdOss/XrdOss.hh"
37
38#include "XrdPfc.hh"
39#include "XrdPfcTrace.hh"
40#include "XrdPfcFSctl.hh"
41#include "XrdPfcInfo.hh"
42#include "XrdPfcIOFile.hh"
43#include "XrdPfcIOFileBlock.hh"
45
47
48using namespace XrdPfc;
49
50Cache *Cache::m_instance = nullptr;
52
53
55{
57 return 0;
58}
59
61{
63 return 0;
64}
65
66void *PrefetchThread(void*)
67{
69 return 0;
70}
71
72//==============================================================================
73
74extern "C"
75{
77 const char *config_filename,
78 const char *parameters,
79 XrdOucEnv *env)
80{
81 XrdSysError err(logger, "");
82 err.Say("++++++ Proxy file cache initialization started.");
83
84 if ( ! env ||
85 ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
86 {
88 XrdPfc::Cache::schedP->Start();
89 }
90
91 Cache &instance = Cache::CreateInstance(logger, env);
92
93 if (! instance.Config(config_filename, parameters, env))
94 {
95 err.Say("Config Proxy file cache initialization failed.");
96 return 0;
97 }
98 err.Say("++++++ Proxy file cache initialization completed.");
99
100 {
101 pthread_t tid;
102
103 XrdSysThread::Run(&tid, ResourceMonitorThread, 0, 0, "XrdPfc ResourceMonitor");
104
105 for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
106 {
107 XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
108 }
109
110 if (instance.is_prefetch_enabled())
111 {
112 XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
113 }
114 }
115
116 XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
117 env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
118
119 return &instance;
120}
121}
122
123//==============================================================================
124
126{
127 assert (m_instance == 0);
128 m_instance = new Cache(logger, env);
129 return *m_instance;
130}
131
132 Cache& Cache::GetInstance() { return *m_instance; }
133const Cache& Cache::TheOne() { return *m_instance; }
134const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
135 ResourceMonitor& Cache::ResMon() { return m_instance->RefResMon(); }
136
138{
139 if (! m_decisionpoints.empty())
140 {
141 XrdCl::URL url(io->Path());
142 std::string filename = url.GetPath();
143 std::vector<Decision*>::const_iterator it;
144 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145 {
146 XrdPfc::Decision *d = *it;
147 if (! d) continue;
148 if (! d->Decide(filename, *m_oss))
149 {
150 return false;
151 }
152 }
153 }
154
155 return true;
156}
157
159 XrdOucCache("pfc"),
160 m_env(env),
161 m_log(logger, "XrdPfc_"),
162 m_trace(new XrdSysTrace("XrdPfc", logger)),
163 m_traceID("Cache"),
164 m_oss(0),
165 m_gstream(0),
166 m_purge_pin(0),
167 m_prefetch_condVar(0),
168 m_prefetch_enabled(false),
169 m_RAM_used(0),
170 m_RAM_write_queue(0),
171 m_RAM_std_size(0),
172 m_isClient(false),
173 m_active_cond(0)
174{
175 // Default log level is Warning.
176 m_trace->What = 2;
177}
178
180{
181 const char* tpfx = "Attach() ";
182
183 if (Options & XrdOucCache::optRW)
184 {
185 TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
186 }
187 else if (Cache::GetInstance().Decide(io))
188 {
189 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
190
191 IO *cio;
192
193 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
194 {
195 cio = new IOFileBlock(io, *this);
196 }
197 else
198 {
199 IOFile *iof = new IOFile(io, *this);
200
201 if ( ! iof->HasFile())
202 {
203 delete iof;
204 // TODO - redirect instead. But this is kind of an awkward place for it.
205 // errno is set during IOFile construction.
206 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
207 return io;
208 }
209
210 cio = iof;
211 }
212
213 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
214 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
215
216 return cio;
217 }
218 else
219 {
220 TRACE(Info, tpfx << "decision decline " << io->Path());
221 }
222 return io;
223}
224
225void Cache::AddWriteTask(Block* b, bool fromRead)
226{
227 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
228
229 {
230 XrdSysMutexHelper lock(&m_RAM_mutex);
231 m_RAM_write_queue += b->get_size();
232 }
233
234 m_writeQ.condVar.Lock();
235 if (fromRead)
236 m_writeQ.queue.push_back(b);
237 else
238 m_writeQ.queue.push_front(b);
239 m_writeQ.size++;
240 m_writeQ.condVar.Signal();
241 m_writeQ.condVar.UnLock();
242}
243
245{
246 std::list<Block*> removed_blocks;
247 long long sum_size = 0;
248
249 m_writeQ.condVar.Lock();
250 std::list<Block*>::iterator i = m_writeQ.queue.begin();
251 while (i != m_writeQ.queue.end())
252 {
253 if ((*i)->m_file == file)
254 {
255 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
256 std::list<Block*>::iterator j = i++;
257 removed_blocks.push_back(*j);
258 sum_size += (*j)->get_size();
259 m_writeQ.queue.erase(j);
260 --m_writeQ.size;
261 }
262 else
263 {
264 ++i;
265 }
266 }
267 m_writeQ.condVar.UnLock();
268
269 {
270 XrdSysMutexHelper lock(&m_RAM_mutex);
271 m_RAM_write_queue -= sum_size;
272 }
273
274 file->BlocksRemovedFromWriteQ(removed_blocks);
275}
276
278{
279 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
280
281 while (true)
282 {
283 m_writeQ.condVar.Lock();
284 while (m_writeQ.size == 0)
285 {
286 m_writeQ.condVar.Wait();
287 }
288
289 // MT -- optimize to pop several blocks if they are available (or swap the list).
290 // This makes sense especially for smallish block sizes.
291
292 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
293 long long sum_size = 0;
294
295 for (int bi = 0; bi < n_pushed; ++bi)
296 {
297 Block* block = m_writeQ.queue.front();
298 m_writeQ.queue.pop_front();
299 m_writeQ.writes_between_purges += block->get_size();
300 sum_size += block->get_size();
301
302 blks_to_write[bi] = block;
303
304 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
305 }
306 m_writeQ.size -= n_pushed;
307
308 m_writeQ.condVar.UnLock();
309
310 {
311 XrdSysMutexHelper lock(&m_RAM_mutex);
312 m_RAM_write_queue -= sum_size;
313 }
314
315 for (int bi = 0; bi < n_pushed; ++bi)
316 {
317 Block* block = blks_to_write[bi];
318
319 block->m_file->WriteBlockToDisk(block);
320 }
321 }
322}
323
325{
326 // Called from ResourceMonitor for an alternative estimation of disk writes.
327 XrdSysCondVarHelper lock(&m_writeQ.condVar);
328 long long ret = m_writeQ.writes_between_purges;
329 m_writeQ.writes_between_purges = 0;
330 return ret;
331}
332
333//==============================================================================
334
335char* Cache::RequestRAM(long long size)
336{
337 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
338
339 bool std_size = (size == m_configuration.m_bufferSize);
340
341 m_RAM_mutex.Lock();
342
343 long long total = m_RAM_used + size;
344
345 if (total <= m_configuration.m_RamAbsAvailable)
346 {
347 m_RAM_used = total;
348 if (std_size && m_RAM_std_size > 0)
349 {
350 char *buf = m_RAM_std_blocks.back();
351 m_RAM_std_blocks.pop_back();
352 --m_RAM_std_size;
353
354 m_RAM_mutex.UnLock();
355
356 return buf;
357 }
358 else
359 {
360 m_RAM_mutex.UnLock();
361 char *buf;
362 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
363 {
364 // Report out of mem? Probably should report it at least the first time,
365 // then periodically.
366 return 0;
367 }
368 return buf;
369 }
370 }
371 m_RAM_mutex.UnLock();
372 return 0;
373}
374
375void Cache::ReleaseRAM(char* buf, long long size)
376{
377 bool std_size = (size == m_configuration.m_bufferSize);
378 {
379 XrdSysMutexHelper lock(&m_RAM_mutex);
380
381 m_RAM_used -= size;
382
383 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
384 {
385 m_RAM_std_blocks.push_back(buf);
386 ++m_RAM_std_size;
387 return;
388 }
389 }
390 free(buf);
391}
392
393File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
394{
395 // Called from virtual IOFile constructor.
396
397 TRACE(Debug, "GetFile " << path << ", io " << io);
398
399 ActiveMap_i it;
400
401 {
402 XrdSysCondVarHelper lock(&m_active_cond);
403
404 while (true)
405 {
406 it = m_active.find(path);
407
408 // File is not open or being opened. Mark it as being opened and
409 // proceed to opening it outside of while loop.
410 if (it == m_active.end())
411 {
412 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
413 break;
414 }
415
416 if (it->second != 0)
417 {
418 it->second->AddIO(io);
419 inc_ref_cnt(it->second, false, true);
420
421 return it->second;
422 }
423 else
424 {
425 // Wait for some change in m_active, then recheck.
426 m_active_cond.Wait();
427 }
428 }
429 }
430
431 // This is always true, now that IOFileBlock is unsupported.
432 if (filesize == 0)
433 {
434 struct stat st;
435 int res = io->Fstat(st);
436 if (res < 0) {
437 errno = res;
438 TRACE(Error, "GetFile, could not get valid stat");
439 } else if (res > 0) {
440 errno = ENOTSUP;
441 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
442 } else {
443 filesize = st.st_size;
444 }
445 }
446
447 File *file = 0;
448
449 if (filesize >= 0)
450 {
451 file = File::FileOpen(path, off, filesize, io->GetInput());
452 }
453
454 {
455 XrdSysCondVarHelper lock(&m_active_cond);
456
457 if (file)
458 {
459 inc_ref_cnt(file, false, true);
460 it->second = file;
461
462 file->AddIO(io);
463 }
464 else
465 {
466 m_active.erase(it);
467 }
468
469 m_active_cond.Broadcast();
470 }
471
472 return file;
473}
474
476{
477 // Called from virtual IO::DetachFinalize.
478
479 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
480
481 {
482 XrdSysCondVarHelper lock(&m_active_cond);
483
484 f->RemoveIO(io);
485 }
486 dec_ref_cnt(f, true);
487}
488
489
490namespace
491{
492
493class DiskSyncer : public XrdJob
494{
495private:
496 File *m_file;
497 bool m_high_debug;
498
499public:
500 DiskSyncer(File *f, bool high_debug, const char *desc = "") :
501 XrdJob(desc),
502 m_file(f),
503 m_high_debug(high_debug)
504 {}
505
506 void DoIt()
507 {
508 m_file->Sync();
509 Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
510 delete this;
511 }
512};
513
514
515class CommandExecutor : public XrdJob
516{
517private:
518 std::string m_command_url;
519
520public:
521 CommandExecutor(const std::string& command, const char *desc = "") :
522 XrdJob(desc),
523 m_command_url(command)
524 {}
525
526 void DoIt()
527 {
528 Cache::GetInstance().ExecuteCommandUrl(m_command_url);
529 delete this;
530 }
531};
532
533}
534
535//==============================================================================
536
537void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
538{
539 DiskSyncer* ds = new DiskSyncer(f, high_debug);
540
541 if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
542
543 schedP->Schedule(ds);
544}
545
546void Cache::FileSyncDone(File* f, bool high_debug)
547{
548 dec_ref_cnt(f, high_debug);
549}
550
551void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
552{
553 // called from GetFile() or SheduleFileSync();
554
555 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
556
557 if (lock) m_active_cond.Lock();
558 int rc = f->inc_ref_cnt();
559 if (lock) m_active_cond.UnLock();
560
561 TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
562}
563
564void Cache::dec_ref_cnt(File* f, bool high_debug)
565{
566 // NOT under active lock.
567 // Called from ReleaseFile(), DiskSync callback and stat-like functions.
568
569 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
570 int cnt;
571
572 bool emergency_close = false;
573 {
574 XrdSysCondVarHelper lock(&m_active_cond);
575
576 cnt = f->get_ref_cnt();
577 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
578
580 {
581 // In this case file has been already removed from m_active map and
582 // does not need to be synced.
583
584 if (cnt == 1)
585 {
586 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
587 << " -- closing and deleting File object without further ado");
588 emergency_close = true;
589 }
590 else
591 {
592 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
593 << " -- waiting");
594 f->dec_ref_cnt();
595 return;
596 }
597 }
598 if (cnt > 1)
599 {
600 f->dec_ref_cnt();
601 return;
602 }
603 }
604 if (emergency_close)
605 {
606 f->Close();
607 delete f;
608 return;
609 }
610
611 if (cnt == 1)
612 {
613 if (f->FinalizeSyncBeforeExit())
614 {
615 // Note, here we "reuse" the existing reference count for the
616 // final sync.
617
618 TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
619 schedule_file_sync(f, true, true);
620 return;
621 }
622 }
623
624 bool finished_p = false;
625 ActiveMap_i act_it;
626 {
627 XrdSysCondVarHelper lock(&m_active_cond);
628
629 cnt = f->dec_ref_cnt();
630 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
631 if (cnt == 0)
632 {
633 act_it = m_active.find(f->GetLocalPath());
634 act_it->second = 0;
635
636 finished_p = true;
637 }
638 }
639 if (finished_p)
640 {
641 f->Close();
642 {
643 XrdSysCondVarHelper lock(&m_active_cond);
644 m_active.erase(act_it);
645 m_active_cond.Broadcast();
646 }
647
648 if (m_gstream)
649 {
650 const Stats &st = f->RefStats();
651 const Info::AStat *as = f->GetLastAccessStats();
652
653 char buf[4096];
654 int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
655 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
656 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
657 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,"
658 "\"b_todisk\":%lld,\"b_prefetch\":%lld,\"n_cks_errs\":%d}",
659 f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
661 (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
662 f->GetRemoteLocations().c_str(),
663 as->BytesHit, as->BytesMissed, as->BytesBypassed,
665 );
666 bool suc = false;
667 if (len < 4096)
668 {
669 suc = m_gstream->Insert(buf, len + 1);
670 }
671 if ( ! suc)
672 {
673 TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
674 }
675 }
676
677 delete f;
678 }
679}
680
681bool Cache::IsFileActiveOrPurgeProtected(const std::string& path) const
682{
683 XrdSysCondVarHelper lock(&m_active_cond);
684
685 return m_active.find(path) != m_active.end() ||
686 m_purge_delay_set.find(path) != m_purge_delay_set.end();
687}
688
690{
691 XrdSysCondVarHelper lock(&m_active_cond);
692 m_purge_delay_set.clear();
693}
694
695//==============================================================================
696//=== PREFETCH
697//==============================================================================
698
700{
701 // Can be called with other locks held.
702
703 if ( ! m_prefetch_enabled)
704 {
705 return;
706 }
707
708 m_prefetch_condVar.Lock();
709 m_prefetchList.push_back(file);
710 m_prefetch_condVar.Signal();
711 m_prefetch_condVar.UnLock();
712}
713
714
716{
717 // Can be called with other locks held.
718
719 if ( ! m_prefetch_enabled)
720 {
721 return;
722 }
723
724 m_prefetch_condVar.Lock();
725 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
726 {
727 if (*it == file)
728 {
729 m_prefetchList.erase(it);
730 break;
731 }
732 }
733 m_prefetch_condVar.UnLock();
734}
735
736
738{
739 m_prefetch_condVar.Lock();
740 while (m_prefetchList.empty())
741 {
742 m_prefetch_condVar.Wait();
743 }
744
745 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
746
747 size_t l = m_prefetchList.size();
748 int idx = rand() % l;
749 File* f = m_prefetchList[idx];
750
751 m_prefetch_condVar.UnLock();
752 return f;
753}
754
755
757{
758 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
759
760 while (true)
761 {
762 m_RAM_mutex.Lock();
763 bool doPrefetch = (m_RAM_used < limit_RAM);
764 m_RAM_mutex.UnLock();
765
766 if (doPrefetch)
767 {
769 f->Prefetch();
770 }
771 else
772 {
774 }
775 }
776}
777
778
779//==============================================================================
780//=== Virtuals from XrdOucCache
781//==============================================================================
782
783//------------------------------------------------------------------------------
797
798int Cache::LocalFilePath(const char *curl, char *buff, int blen,
799 LFP_Reason why, bool forall)
800{
801 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
802 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
803 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
804
805 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
806
807 if (buff && blen > 0) buff[0] = 0;
808
809 XrdCl::URL url(curl);
810 std::string f_name = url.GetPath();
811 std::string i_name = f_name + Info::s_infoExtension;
812
813 if (why == ForPath)
814 {
815 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
816 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
817 return ret;
818 }
819
820 {
821 XrdSysCondVarHelper lock(&m_active_cond);
822 m_purge_delay_set.insert(f_name);
823 }
824
825 struct stat sbuff, sbuff2;
826 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
827 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
828 {
829 if (S_ISDIR(sbuff.st_mode))
830 {
831 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
832 return -EISDIR;
833 }
834 else
835 {
836 bool read_ok = false;
837 bool is_complete = false;
838
839 // Lock and check if the file is active. If NOT, keep the lock
840 // and add dummy access after successful reading of info file.
841 // If it IS active, just release the lock, this ongoing access will
842 // assure the file continues to exist.
843
844 // XXXX How can I just loop over the cinfo file when active?
845 // Can I not get is_complete from the existing file?
846 // Do I still want to inject access record?
847 // Oh, it writes only if not active .... still let's try to use existing File.
848
849 m_active_cond.Lock();
850
851 bool is_active = m_active.find(f_name) != m_active.end();
852
853 if (is_active) m_active_cond.UnLock();
854
855 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
856 XrdOucEnv myEnv;
857 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
858 if (res >= 0)
859 {
860 Info info(m_trace, 0);
861 if (info.Read(infoFile, i_name.c_str()))
862 {
863 read_ok = true;
864
865 is_complete = info.IsComplete();
866
867 // Add full-size access if reason is for access.
868 if ( ! is_active && is_complete && why == ForAccess)
869 {
870 info.WriteIOStatSingle(info.GetFileSize());
871 info.Write(infoFile, i_name.c_str());
872 }
873 }
874 infoFile->Close();
875 }
876 delete infoFile;
877
878 if ( ! is_active) m_active_cond.UnLock();
879
880 if (read_ok)
881 {
882 if ((is_complete || why == ForInfo) && buff != 0)
883 {
884 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
885 if (res2 < 0)
886 return res2;
887
888 // Normally, files are owned by us but when direct cache access
889 // is wanted and possible, make sure the file is world readable.
890 if (why == ForAccess)
891 {mode_t mode = (forall ? worldReadable : groupReadable);
892 if (((sbuff.st_mode & worldReadable) != mode)
893 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
894 {is_complete = false;
895 *buff = 0;
896 }
897 }
898 }
899
900 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
901 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
902
903 return is_complete ? 0 : -EREMOTE;
904 }
905 }
906 }
907
908 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
909 return -ENOENT;
910}
911
912//______________________________________________________________________________
913// If supported, write file_size as xattr to cinfo file.
914//------------------------------------------------------------------------------
915void Cache::WriteFileSizeXAttr(int cinfo_fd, long long file_size)
916{
917 if (m_metaXattr) {
918 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
919 if (res != 0) {
920 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
921 }
922 }
923}
924
925//______________________________________________________________________________
926// Determine full size of the data file from the corresponding cinfo-file name.
927// Attempts to read xattr first and falls back to reading of the cinfo file.
928// Returns -error on failure.
929//------------------------------------------------------------------------------
930long long Cache::DetermineFullFileSize(const std::string &cinfo_fname)
931{
932 if (m_metaXattr) {
933 char pfn[4096];
934 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
935 long long fsize = -1ll;
936 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
937 if (res == sizeof(long long))
938 {
939 return fsize;
940 }
941 else
942 {
943 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
944 }
945 }
946
947 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
948 XrdOucEnv env;
949 long long ret;
950 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
951 if (res < 0) {
952 ret = res;
953 } else {
954 Info info(m_trace, 0);
955 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
956 ret = -EBADF;
957 } else {
958 ret = info.GetFileSize();
959 }
960 infoFile->Close();
961 }
962 delete infoFile;
963 return ret;
964}
965
966//______________________________________________________________________________
967// Calculate if the file is to be considered cached for the purposes of
968// only-if-cached and setting of atime of the Stat() calls.
969// Returns true if the file is to be conidered cached.
970//------------------------------------------------------------------------------
971bool Cache::DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
972{
973 if (file_size == 0 || bytes_on_disk >= file_size)
974 return true;
975
976 double frac_on_disk = (double) bytes_on_disk / file_size;
977
978 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
979 {
980 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
981 return true;
982 }
983 else
984 {
985 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
986 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
987 return true;
988 }
989 return false;
990}
991
992//______________________________________________________________________________
993// Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
994// pfc configuration parameters. The logic of accessing the Info file is the same
995// as in Cache::LocalFilePath.
1003//------------------------------------------------------------------------------
1004int Cache::ConsiderCached(const char *curl)
1005{
1006 static const char* tpfx = "ConsiderCached ";
1007
1008 TRACE(Debug, tpfx << curl);
1009
1010 XrdCl::URL url(curl);
1011 std::string f_name = url.GetPath();
1012
1013 File *file = nullptr;
1014 {
1015 XrdSysCondVarHelper lock(&m_active_cond);
1016 auto it = m_active.find(f_name);
1017 if (it != m_active.end()) {
1018 file = it->second;
1019 // If the file-open is in progress, `file` is a nullptr
1020 // so we cannot increase the reference count. For now,
1021 // simply treat it as if the file open doesn't exist instead
1022 // of trying to wait and see if it succeeds.
1023 if (file) {
1024 inc_ref_cnt(file, false, false);
1025 }
1026 }
1027 }
1028 if (file) {
1029 struct stat sbuff;
1030 int res = file->Fstat(sbuff);
1031 dec_ref_cnt(file, false);
1032 if (res)
1033 return res;
1034 // DecideIfConsideredCached() already called in File::Fstat().
1035 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1036 }
1037
1038 struct stat sbuff;
1039 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1040 if (res != XrdOssOK) {
1041 TRACE(Debug, tpfx << curl << " -> " << res);
1042 return res;
1043 }
1044 if (S_ISDIR(sbuff.st_mode))
1045 {
1046 TRACE(Debug, tpfx << curl << " -> EISDIR");
1047 return -EISDIR;
1048 }
1049
1050 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1051 if (file_size < 0) {
1052 TRACE(Debug, tpfx << curl << " -> " << file_size);
1053 return (int) file_size;
1054 }
1055 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1056
1057 return is_cached ? 0 : -EREMOTE;
1058}
1059
1060//______________________________________________________________________________
1068//------------------------------------------------------------------------------
1069
1070int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1071{
1072 XrdCl::URL url(curl);
1073 std::string f_name = url.GetPath();
1074 std::string i_name = f_name + Info::s_infoExtension;
1075
1076 // Do not allow write access.
1077 if ((oflags & O_ACCMODE) != O_RDONLY)
1078 {
1079 if (Cache::GetInstance().RefConfiguration().m_write_through)
1080 {
1081 return 0;
1082 }
1083 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1084 return -EROFS;
1085 }
1086
1087 // Intercept xrdpfc_command requests.
1088 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1089 {
1090 // Schedule a job to process command request.
1091 {
1092 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1093
1094 schedP->Schedule(ce);
1095 }
1096
1097 return -EAGAIN;
1098 }
1099
1100 {
1101 XrdSysCondVarHelper lock(&m_active_cond);
1102 m_purge_delay_set.insert(f_name);
1103 }
1104
1105 struct stat sbuff;
1106 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1107 {
1108 TRACE(Dump, "Prepare defer open " << f_name);
1109 return 1;
1110 }
1111 else
1112 {
1113 return 0;
1114 }
1115}
1116
1117//______________________________________________________________________________
1118// virtual method of XrdOucCache.
1123//------------------------------------------------------------------------------
1124
1125int Cache::Stat(const char *curl, struct stat &sbuff)
1126{
1127 const char *tpfx = "Stat ";
1128
1129 XrdCl::URL url(curl);
1130 std::string f_name = url.GetPath();
1131
1132 File *file = nullptr;
1133 {
1134 XrdSysCondVarHelper lock(&m_active_cond);
1135 auto it = m_active.find(f_name);
1136 if (it != m_active.end()) {
1137 file = it->second;
1138 // If `file` is nullptr, the file-open is in progress; instead
1139 // of waiting for the file-open to finish, simply treat it as if
1140 // the file-open doesn't exist.
1141 if (file) {
1142 inc_ref_cnt(file, false, false);
1143 }
1144 }
1145 }
1146 if (file) {
1147 int res = file->Fstat(sbuff);
1148 dec_ref_cnt(file, false);
1149 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1150 return res;
1151 }
1152
1153 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1154 if (res != XrdOssOK) {
1155 TRACE(Debug, tpfx << curl << " -> " << res);
1156 return 1; // res; -- for only-if-cached
1157 }
1158 if (S_ISDIR(sbuff.st_mode))
1159 {
1160 TRACE(Debug, tpfx << curl << " -> EISDIR");
1161 return -EISDIR;
1162 }
1163
1164 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1165 if (file_size < 0) {
1166 TRACE(Debug, tpfx << curl << " -> " << file_size);
1167 return 1; // (int) file_size; -- for only-if-cached
1168 }
1169 sbuff.st_size = file_size;
1170 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1171 if ( ! is_cached)
1172 sbuff.st_atime = 0;
1173
1174 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1175
1176 return 0;
1177}
1178
1179//______________________________________________________________________________
1180// virtual method of XrdOucCache.
1184//------------------------------------------------------------------------------
1185
1186int Cache::Unlink(const char *curl)
1187{
1188 XrdCl::URL url(curl);
1189 std::string f_name = url.GetPath();
1190
1191 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1192
1193 return UnlinkFile(f_name, false);
1194}
1195
1196int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1197{
1198 static const char* trc_pfx = "UnlinkFile ";
1199 ActiveMap_i it;
1200 File *file = 0;
1201 long long st_blocks_to_purge = 0;
1202 {
1203 XrdSysCondVarHelper lock(&m_active_cond);
1204
1205 it = m_active.find(f_name);
1206
1207 if (it != m_active.end())
1208 {
1209 if (fail_if_open)
1210 {
1211 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1212 return -EBUSY;
1213 }
1214
1215 // Null File* in m_active map means an operation is ongoing, probably
1216 // Attach() with possible File::Open(). Ask for retry.
1217 if (it->second == 0)
1218 {
1219 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1220 return -EAGAIN;
1221 }
1222
1223 file = it->second;
1224 st_blocks_to_purge = file->initiate_emergency_shutdown();
1225 it->second = 0;
1226 }
1227 else
1228 {
1229 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1230 }
1231 }
1232
1233 if (file) {
1235 } else {
1236 struct stat f_stat;
1237 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1238 st_blocks_to_purge = f_stat.st_blocks;
1239 }
1240
1241 std::string i_name = f_name + Info::s_infoExtension;
1242
1243 // Unlink file & cinfo
1244 int f_ret = m_oss->Unlink(f_name.c_str());
1245 int i_ret = m_oss->Unlink(i_name.c_str());
1246
1247 if (st_blocks_to_purge)
1248 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1249
1250 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1251
1252 {
1253 XrdSysCondVarHelper lock(&m_active_cond);
1254 m_active.erase(it);
1255 m_active_cond.Broadcast();
1256 }
1257
1258 return std::min(f_ret, i_ret);
1259}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
Definition XrdPfc.cc:60
XrdSysXAttr * XrdSysXAttrActive
void * ResourceMonitorThread(void *)
Definition XrdPfc.cc:54
void * PrefetchThread(void *)
Definition XrdPfc.cc:66
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define stat(a, b)
Definition XrdPosix.hh:101
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only)
XrdOucCache(const char *ctype)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:281
void PutPtr(const char *varname, void *value)
Definition XrdOucEnv.cc:316
int get_size() const
long long m_offset
File * get_file() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:165
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:930
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:546
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:393
static const Configuration & Conf()
Definition XrdPfc.cc:134
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:798
virtual int Stat(const char *url, struct stat &sbuff)
Definition XrdPfc.cc:1125
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:217
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
bool IsFileActiveOrPurgeProtected(const std::string &) const
Definition XrdPfc.cc:681
void ClearPurgeProtectedSet()
Definition XrdPfc.cc:689
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:375
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:1004
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:715
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:699
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition XrdPfc.cc:915
void Prefetch()
Definition XrdPfc.cc:756
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:475
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:225
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:158
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:137
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1196
static XrdScheduler * schedP
Definition XrdPfc.hh:304
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:737
long long WritesSinceLastCall()
Definition XrdPfc.cc:324
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:277
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1186
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:244
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:179
static const Cache & TheOne()
Definition XrdPfc.cc:133
char * RequestRAM(long long size)
Definition XrdPfc.cc:335
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1070
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:971
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:125
bool is_prefetch_enabled() const
Definition XrdPfc.hh:309
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
int GetNBlocks() const
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
int inc_ref_cnt()
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
long long initiate_emergency_shutdown()
long long GetFileSize() const
const std::string & GetLocalPath() const
void RemoveIO(IO *io)
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
int m_NCksumErrors
number of checksum errors while getting data from remote
long long m_BytesWritten
number of bytes written to disk
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
XrdPosixStats Stats
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long BytesHit
read from cache
Definition XrdPfcInfo.hh:64
long long BytesBypassed
read from remote and dropped
Definition XrdPfcInfo.hh:66
time_t DetachTime
close time
Definition XrdPfcInfo.hh:59
long long BytesMissed
read from remote and cached
Definition XrdPfcInfo.hh:65
time_t AttachTime
open time
Definition XrdPfcInfo.hh:58