XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfcIO.hh"
22 #include "XrdPfcTrace.hh"
23 #include <cstdio>
24 #include <sstream>
25 #include <fcntl.h>
26 #include <assert.h>
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClFile.hh"
30 #include "XrdSys/XrdSysPthread.hh"
31 #include "XrdSys/XrdSysTimer.hh"
32 #include "XrdOss/XrdOss.hh"
33 #include "XrdOuc/XrdOucEnv.hh"
35 #include "XrdPfc.hh"
36 
37 
38 using namespace XrdPfc;
39 
40 namespace
41 {
42 
43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44 
45 Cache* cache() { return &Cache::GetInstance(); }
46 
47 }
48 
49 const char *File::m_traceID = "File";
50 
51 //------------------------------------------------------------------------------
52 
53 File::File(const std::string& path, long long iOffset, long long iFileSize) :
54  m_ref_cnt(0),
55  m_data_file(0),
56  m_info_file(0),
57  m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58  m_filename(path),
59  m_offset(iOffset),
60  m_file_size(iFileSize),
61  m_current_io(m_io_set.end()),
62  m_ios_in_detach(0),
63  m_non_flushed_cnt(0),
64  m_in_sync(false),
65  m_detach_time_logged(false),
66  m_in_shutdown(false),
67  m_state_cond(0),
68  m_block_size(0),
69  m_num_blocks(0),
70  m_prefetch_state(kOff),
71  m_prefetch_bytes(0),
72  m_prefetch_read_cnt(0),
73  m_prefetch_hit_cnt(0),
74  m_prefetch_score(0)
75 {}
76 
77 File::~File()
78 {
79  if (m_info_file)
80  {
81  TRACEF(Debug, "~File() close info ");
82  m_info_file->Close();
83  delete m_info_file;
84  m_info_file = NULL;
85  }
86 
87  if (m_data_file)
88  {
89  TRACEF(Debug, "~File() close output ");
90  m_data_file->Close();
91  delete m_data_file;
92  m_data_file = NULL;
93  }
94 
95  TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
96 }
97 
98 //------------------------------------------------------------------------------
99 
100 File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
101 {
102  File *file = new File(path, offset, fileSize);
103  if ( ! file->Open())
104  {
105  delete file;
106  file = 0;
107  }
108  return file;
109 }
110 
111 //------------------------------------------------------------------------------
112 
114 {
115  // Called from Cache::Unlink() when the file is currently open.
116  // Cache::Unlink is also called on FSync error and when wrong number of bytes
117  // is received from a remote read.
118  //
119  // From this point onward the file will not be written to, cinfo file will
120  // not be updated, and all new read requests will return -ENOENT.
121  //
122  // File's entry in the Cache's active map is set to nullptr and will be
123  // removed from there shortly, in any case, well before this File object
124  // shuts down. So we do not communicate to Cache about our destruction when
125  // it happens.
126 
127  {
128  XrdSysCondVarHelper _lck(m_state_cond);
129 
130  m_in_shutdown = true;
131 
132  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
133  {
134  m_prefetch_state = kStopped;
135  cache()->DeRegisterPrefetchFile(this);
136  }
137  }
138 
139 }
140 
141 //------------------------------------------------------------------------------
142 
144 {
145  // Not locked, only used from Cache / Purge thread.
146 
147  Stats delta = m_last_stats;
148 
149  m_last_stats = m_stats.Clone();
150 
151  delta.DeltaToReference(m_last_stats);
152 
153  return delta;
154 }
155 
156 //------------------------------------------------------------------------------
157 
159 {
160  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
161 
162  XrdSysCondVarHelper _lck(m_state_cond);
163  dec_ref_count(b);
164 }
165 
166 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
167 {
168  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
169 
170  XrdSysCondVarHelper _lck(m_state_cond);
171 
172  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
173  {
174  dec_ref_count(*i);
175  }
176 }
177 
178 //------------------------------------------------------------------------------
179 
181 {
182  std::string loc(io->GetLocation());
183  XrdSysCondVarHelper _lck(m_state_cond);
184  insert_remote_location(loc);
185 }
186 
187 //------------------------------------------------------------------------------
188 
190 {
191  // Returns true if delay is needed.
192 
193  TRACEF(Debug, "ioActive start for io " << io);
194 
195  std::string loc(io->GetLocation());
196 
197  {
198  XrdSysCondVarHelper _lck(m_state_cond);
199 
200  IoSet_i mi = m_io_set.find(io);
201 
202  if (mi != m_io_set.end())
203  {
204  unsigned int n_active_reads = io->m_active_read_reqs;
205 
206  TRACE(Info, "ioActive for io " << io <<
207  ", active_reads " << n_active_reads <<
208  ", active_prefetches " << io->m_active_prefetches <<
209  ", allow_prefetching " << io->m_allow_prefetching <<
210  ", ios_in_detach " << m_ios_in_detach);
211  TRACEF(Info,
212  "\tio_map.size() " << m_io_set.size() <<
213  ", block_map.size() " << m_block_map.size() << ", file");
214 
215  insert_remote_location(loc);
216 
217  io->m_allow_prefetching = false;
218  io->m_in_detach = true;
219 
220  // Check if any IO is still available for prfetching. If not, stop it.
221  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
222  {
223  if ( ! select_current_io_or_disable_prefetching(false) )
224  {
225  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
226  }
227  }
228 
229  // On last IO, consider write queue blocks. Note, this also contains
230  // blocks being prefetched.
231 
232  bool io_active_result;
233 
234  if (n_active_reads > 0)
235  {
236  io_active_result = true;
237  }
238  else if (m_io_set.size() - m_ios_in_detach == 1)
239  {
240  io_active_result = ! m_block_map.empty();
241  }
242  else
243  {
244  io_active_result = io->m_active_prefetches > 0;
245  }
246 
247  if ( ! io_active_result)
248  {
249  ++m_ios_in_detach;
250  }
251 
252  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
253 
254  return io_active_result;
255  }
256  else
257  {
258  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
259  return false;
260  }
261  }
262 }
263 
264 //------------------------------------------------------------------------------
265 
267 {
268  XrdSysCondVarHelper _lck(m_state_cond);
269  m_detach_time_logged = false;
270 }
271 
273 {
274  // Returns true if sync is required.
275  // This method is called after corresponding IO is detached from PosixCache.
276 
277  XrdSysCondVarHelper _lck(m_state_cond);
278  if ( ! m_in_shutdown)
279  {
280  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
281  {
282  Stats loc_stats = m_stats.Clone();
283  m_cfi.WriteIOStatDetach(loc_stats);
284  m_detach_time_logged = true;
285  m_in_sync = true;
286  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
287  return true;
288  }
289  }
290  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
291  return false;
292 }
293 
294 //------------------------------------------------------------------------------
295 
296 void File::AddIO(IO *io)
297 {
298  // Called from Cache::GetFile() when a new IO asks for the file.
299 
300  TRACEF(Debug, "AddIO() io = " << (void*)io);
301 
302  time_t now = time(0);
303  std::string loc(io->GetLocation());
304 
305  m_state_cond.Lock();
306 
307  IoSet_i mi = m_io_set.find(io);
308 
309  if (mi == m_io_set.end())
310  {
311  m_io_set.insert(io);
312  io->m_attach_time = now;
313  m_stats.IoAttach();
314 
315  insert_remote_location(loc);
316 
317  if (m_prefetch_state == kStopped)
318  {
319  m_prefetch_state = kOn;
320  cache()->RegisterPrefetchFile(this);
321  }
322  }
323  else
324  {
325  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
326  }
327 
328  m_state_cond.UnLock();
329 }
330 
331 //------------------------------------------------------------------------------
332 
334 {
335  // Called from Cache::ReleaseFile.
336 
337  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
338 
339  time_t now = time(0);
340 
341  m_state_cond.Lock();
342 
343  IoSet_i mi = m_io_set.find(io);
344 
345  if (mi != m_io_set.end())
346  {
347  if (mi == m_current_io)
348  {
349  ++m_current_io;
350  }
351 
352  m_stats.IoDetach(now - io->m_attach_time);
353  m_io_set.erase(mi);
354  --m_ios_in_detach;
355 
356  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
357  {
358  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
359  m_prefetch_state = kStopped;
360  cache()->DeRegisterPrefetchFile(this);
361  }
362  }
363  else
364  {
365  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
366  }
367 
368  m_state_cond.UnLock();
369 }
370 
371 //------------------------------------------------------------------------------
372 
373 bool File::Open()
374 {
375  // Sets errno accordingly.
376 
377  static const char *tpfx = "Open() ";
378 
379  TRACEF(Dump, tpfx << "open file for disk cache");
380 
382 
383  XrdOss &myOss = * Cache::GetInstance().GetOss();
384  const char *myUser = conf.m_username.c_str();
385  XrdOucEnv myEnv;
386  struct stat data_stat, info_stat;
387 
388  std::string ifn = m_filename + Info::s_infoExtension;
389 
390  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
391  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
392 
393  // Create the data file itself.
394  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
395  myEnv.Put("oss.asize", size_str);
396  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
397 
398  int res;
399 
400  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
401  {
402  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
403  errno = -res;
404  return false;
405  }
406 
407  m_data_file = myOss.newFile(myUser);
408  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
409  {
410  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
411  errno = -res;
412  delete m_data_file; m_data_file = 0;
413  return false;
414  }
415 
416  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
417  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
418  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
419  {
420  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
421  errno = -res;
422  m_data_file->Close(); delete m_data_file; m_data_file = 0;
423  return false;
424  }
425 
426  m_info_file = myOss.newFile(myUser);
427  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
428  {
429  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
430  errno = -res;
431  delete m_info_file; m_info_file = 0;
432  m_data_file->Close(); delete m_data_file; m_data_file = 0;
433  return false;
434  }
435 
436  bool initialize_info_file = true;
437 
438  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
439  {
440  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
441  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
442  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
443 
444  // Check if data file exists and is of reasonable size.
445  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
446  {
447  initialize_info_file = false;
448  } else {
449  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
450  m_cfi.ResetAllAccessStats();
451  m_data_file->Ftruncate(0);
452  }
453  }
454 
455  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
456  {
457  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
458  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
459  {
460  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
461  initialize_info_file = true;
462  m_cfi.ResetAllAccessStats();
463  m_data_file->Ftruncate(0);
464  } else {
465  // TODO: If the file is complete, we don't need to reset net cksums.
466  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
467  }
468  }
469 
470  if (initialize_info_file)
471  {
472  m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
473  m_cfi.SetCkSumState(conf.get_cs_Chk());
474  m_cfi.ResetNoCkSumTime();
475  m_cfi.Write(m_info_file, ifn.c_str());
476  m_info_file->Fsync();
477  cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
478  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
479  }
480 
481  m_cfi.WriteIOStatAttach();
482  m_state_cond.Lock();
483  m_block_size = m_cfi.GetBufferSize();
484  m_num_blocks = m_cfi.GetNBlocks();
485  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
486  m_state_cond.UnLock();
487 
488  return true;
489 }
490 
491 int File::Fstat(struct stat &sbuff)
492 {
493  // Stat on an open file.
494  // Corrects size to actual full size of the file.
495  // Sets atime to 0 if the file is only partially downloaded, in accordance
496  // with pfc.onlyifcached settings.
497  // Called from IO::Fstat() and Cache::Stat() when the file is active.
498  // Returns 0 on success, -errno on error.
499 
500  int res;
501 
502  if ((res = m_data_file->Fstat(&sbuff))) return res;
503 
504  sbuff.st_size = m_file_size;
505 
506  bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
507  if ( ! is_cached)
508  sbuff.st_atime = 0;
509 
510  return 0;
511 }
512 
513 //==============================================================================
514 // Read and helpers
515 //==============================================================================
516 
517 bool File::overlap(int blk, // block to query
518  long long blk_size, //
519  long long req_off, // offset of user request
520  int req_size, // size of user request
521  // output:
522  long long &off, // offset in user buffer
523  long long &blk_off, // offset in block
524  int &size) // size to copy
525 {
526  const long long beg = blk * blk_size;
527  const long long end = beg + blk_size;
528  const long long req_end = req_off + req_size;
529 
530  if (req_off < end && req_end > beg)
531  {
532  const long long ovlp_beg = std::max(beg, req_off);
533  const long long ovlp_end = std::min(end, req_end);
534 
535  off = ovlp_beg - req_off;
536  blk_off = ovlp_beg - beg;
537  size = (int) (ovlp_end - ovlp_beg);
538 
539  assert(size <= blk_size);
540  return true;
541  }
542  else
543  {
544  return false;
545  }
546 }
547 
548 //------------------------------------------------------------------------------
549 
550 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
551 {
552  // Must be called w/ state_cond locked.
553  // Checks on size etc should be done before.
554  //
555  // Reference count is 0 so increase it in calling function if you want to
556  // catch the block while still in memory.
557 
558  const long long off = i * m_block_size;
559  const int last_block = m_num_blocks - 1;
560  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
561 
562  int blk_size, req_size;
563  if (i == last_block) {
564  blk_size = req_size = m_file_size - off;
565  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
566  } else {
567  blk_size = req_size = m_block_size;
568  }
569 
570  Block *b = 0;
571  char *buf = cache()->RequestRAM(req_size);
572 
573  if (buf)
574  {
575  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
576 
577  if (b)
578  {
579  m_block_map[i] = b;
580 
581  // Actual Read request is issued in ProcessBlockRequests().
582 
583  if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
584  {
585  m_prefetch_state = kHold;
586  cache()->DeRegisterPrefetchFile(this);
587  }
588  }
589  else
590  {
591  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
592  }
593  }
594 
595  return b;
596 }
597 
598 void File::ProcessBlockRequest(Block *b)
599 {
600  // This *must not* be called with block_map locked.
601 
603 
604  if (XRD_TRACE What >= TRACE_Dump) {
605  char buf[256];
606  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
607  b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
608  TRACEF(Dump, "ProcessBlockRequest() " << buf);
609  }
610 
611  if (b->req_cksum_net())
612  {
613  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
614  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
615  } else {
616  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
617  }
618 }
619 
620 void File::ProcessBlockRequests(BlockList_t& blks)
621 {
622  // This *must not* be called with block_map locked.
623 
624  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
625  {
626  ProcessBlockRequest(*bi);
627  }
628 }
629 
630 //------------------------------------------------------------------------------
631 
632 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
633 {
634  int n_chunks = ioVec.size();
635  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
636 
637  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
638  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
639 
640  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
641 
642  int pos = 0;
643  while (n_chunks > XrdProto::maxRvecsz) {
644  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
645  pos += XrdProto::maxRvecsz;
646  n_chunks -= XrdProto::maxRvecsz;
647  }
648  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
649 }
650 
651 //------------------------------------------------------------------------------
652 
653 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
654 {
655  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
656 
657  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
658 
659  if (rs < 0)
660  {
661  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
662  return rs;
663  }
664 
665  if (rs != expected_size)
666  {
667  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
668  return -EIO;
669  }
670 
671  return (int) rs;
672 }
673 
674 //------------------------------------------------------------------------------
675 
676 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
677 {
678  // rrc_func is ONLY called from async processing.
679  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
680  // This streamlines implementation of synchronous IO::Read().
681 
682  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
683 
684  m_state_cond.Lock();
685 
686  if (m_in_shutdown || io->m_in_detach)
687  {
688  m_state_cond.UnLock();
689  return m_in_shutdown ? -ENOENT : -EBADF;
690  }
691 
692  // Shortcut -- file is fully downloaded.
693 
694  if (m_cfi.IsComplete())
695  {
696  m_state_cond.UnLock();
697  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
698  if (ret > 0) m_stats.AddBytesHit(ret);
699  return ret;
700  }
701 
702  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
703 
704  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
705 }
706 
707 //------------------------------------------------------------------------------
708 
709 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
710 {
711  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
712 
713  m_state_cond.Lock();
714 
715  if (m_in_shutdown || io->m_in_detach)
716  {
717  m_state_cond.UnLock();
718  return m_in_shutdown ? -ENOENT : -EBADF;
719  }
720 
721  // Shortcut -- file is fully downloaded.
722 
723  if (m_cfi.IsComplete())
724  {
725  m_state_cond.UnLock();
726  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
727  if (ret > 0) m_stats.AddBytesHit(ret);
728  return ret;
729  }
730 
731  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
732 }
733 
734 //------------------------------------------------------------------------------
735 
736 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
737  ReadReqRH *rh, const char *tpfx)
738 {
739  // Non-trivial processing for Read and ReadV.
740  // Entered under lock.
741  //
742  // loop over reqired blocks:
743  // - if on disk, ok;
744  // - if in ram or incoming, inc ref-count
745  // - otherwise request and inc ref count (unless RAM full => request direct)
746  // unlock
747 
748  int prefetch_cnt = 0;
749 
750  ReadRequest *read_req = nullptr;
751  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
752 
753  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
754 
755  std::vector<XrdOucIOVec> iovec_disk;
756  std::vector<XrdOucIOVec> iovec_direct;
757  int iovec_disk_total = 0;
758  int iovec_direct_total = 0;
759 
760  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
761  {
762  const XrdOucIOVec &iov = readV[iov_idx];
763  long long iUserOff = iov.offset;
764  int iUserSize = iov.size;
765  char *iUserBuff = iov.data;
766 
767  const int idx_first = iUserOff / m_block_size;
768  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
769 
770  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
771 
772  enum LastBlock_e { LB_other, LB_disk, LB_direct };
773 
774  LastBlock_e lbe = LB_other;
775 
776  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
777  {
778  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
779  BlockMap_i bi = m_block_map.find(block_idx);
780 
781  // overlap and read
782  long long off; // offset in user buffer
783  long long blk_off; // offset in block
784  int size; // size to copy
785 
786  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
787 
788  // In RAM or incoming?
789  if (bi != m_block_map.end())
790  {
791  inc_ref_count(bi->second);
792  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
793 
794  if (bi->second->is_finished())
795  {
796  // note, blocks with error should not be here !!!
797  // they should be either removed or reissued in ProcessBlockResponse()
798  assert(bi->second->is_ok());
799 
800  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
801 
802  if (bi->second->m_prefetch)
803  ++prefetch_cnt;
804  }
805  else
806  {
807  if ( ! read_req)
808  read_req = new ReadRequest(io, rh);
809 
810  // We have a lock on state_cond --> as we register the request before releasing the lock,
811  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
812 
813  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
814  ++read_req->m_n_chunk_reqs;
815  }
816 
817  lbe = LB_other;
818  }
819  // On disk?
820  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
821  {
822  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
823 
824  if (lbe == LB_disk)
825  iovec_disk.back().size += size;
826  else
827  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
828  iovec_disk_total += size;
829 
830  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
831  ++prefetch_cnt;
832 
833  lbe = LB_disk;
834  }
835  // Neither ... then we have to go get it ...
836  else
837  {
838  if ( ! read_req)
839  read_req = new ReadRequest(io, rh);
840 
841  // Is there room for one more RAM Block?
842  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
843  if (b)
844  {
845  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
846  inc_ref_count(b);
847  blks_to_request.push_back(b);
848 
849  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
850  ++read_req->m_n_chunk_reqs;
851 
852  lbe = LB_other;
853  }
854  else // Nope ... read this directly without caching.
855  {
856  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
857 
858  iovec_direct_total += size;
859  read_req->m_direct_done = false;
860 
861  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
862  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
863  // is determined in the RequestBlocksDirect().
864  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
865  iovec_direct.back().size += size;
866  } else {
867  long long in_offset = block_idx * m_block_size + blk_off;
868  char *out_pos = iUserBuff + off;
869  while (size > XrdProto::maxRVdsz) {
870  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
871  in_offset += XrdProto::maxRVdsz;
872  out_pos += XrdProto::maxRVdsz;
873  size -= XrdProto::maxRVdsz;
874  }
875  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
876  }
877 
878  lbe = LB_direct;
879  }
880  }
881  } // end for over blocks in an IOVec
882  } // end for over readV IOVec
883 
884  inc_prefetch_hit_cnt(prefetch_cnt);
885 
886  m_state_cond.UnLock();
887 
888  // First, send out remote requests for new blocks.
889  if ( ! blks_to_request.empty())
890  {
891  ProcessBlockRequests(blks_to_request);
892  blks_to_request.clear();
893  }
894 
895  // Second, send out remote direct read requests.
896  if ( ! iovec_direct.empty())
897  {
898  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
899 
900  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
901  }
902 
903  // Begin synchronous part where we process data that is already in RAM or on disk.
904 
905  long long bytes_read = 0;
906  int error_cond = 0; // to be set to -errno
907 
908  // Third, process blocks that are available in RAM.
909  if ( ! blks_ready.empty())
910  {
911  for (auto &bvi : blks_ready)
912  {
913  for (auto &cr : bvi.second)
914  {
915  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
916  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
917  bytes_read += cr.m_size;
918  }
919  }
920  }
921 
922  // Fourth, read blocks from disk.
923  if ( ! iovec_disk.empty())
924  {
925  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
926  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
927  if (rc >= 0)
928  {
929  bytes_read += rc;
930  }
931  else
932  {
933  error_cond = rc;
934  TRACEF(Error, tpfx << "failed read from disk");
935  }
936  }
937 
938  // End synchronous part -- update with sync stats and determine actual state of this read.
939  // Note: remote reads might have already finished during disk-read!
940 
941  m_state_cond.Lock();
942 
943  for (auto &bvi : blks_ready)
944  dec_ref_count(bvi.first, (int) bvi.second.size());
945 
946  if (read_req)
947  {
948  read_req->m_bytes_read += bytes_read;
949  if (error_cond)
950  read_req->update_error_cond(error_cond);
951  read_req->m_stats.m_BytesHit += bytes_read;
952  read_req->m_sync_done = true;
953 
954  if (read_req->is_complete())
955  {
956  // Almost like FinalizeReadRequest(read_req) -- but no callout!
957  m_state_cond.UnLock();
958 
959  m_stats.AddReadStats(read_req->m_stats);
960 
961  int ret = read_req->return_value();
962  delete read_req;
963  return ret;
964  }
965  else
966  {
967  m_state_cond.UnLock();
968  return -EWOULDBLOCK;
969  }
970  }
971  else
972  {
973  m_stats.m_BytesHit += bytes_read;
974  m_state_cond.UnLock();
975 
976  // !!! No callout.
977 
978  return error_cond ? error_cond : bytes_read;
979  }
980 }
981 
982 
983 //==============================================================================
984 // WriteBlock and Sync
985 //==============================================================================
986 
988 {
989  // write block buffer into disk file
990  long long offset = b->m_offset - m_offset;
991  long long size = b->get_size();
992  ssize_t retval;
993 
994  if (m_cfi.IsCkSumCache())
995  if (b->has_cksums())
996  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
997  else
998  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
999  else
1000  retval = m_data_file->Write(b->get_buff(), offset, size);
1001 
1002  if (retval < size)
1003  {
1004  if (retval < 0)
1005  {
1006  GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
1007  }
1008  else
1009  {
1010  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1011  }
1012 
1013  XrdSysCondVarHelper _lck(m_state_cond);
1014 
1015  dec_ref_count(b);
1016 
1017  return;
1018  }
1019 
1020  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1021 
1022  // Set written bit.
1023  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1024 
1025  bool schedule_sync = false;
1026  {
1027  XrdSysCondVarHelper _lck(m_state_cond);
1028 
1029  m_cfi.SetBitWritten(blk_idx);
1030 
1031  if (b->m_prefetch)
1032  {
1033  m_cfi.SetBitPrefetch(blk_idx);
1034  }
1035  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1036  {
1037  m_cfi.ResetCkSumNet();
1038  }
1039 
1040  dec_ref_count(b);
1041 
1042  // Set synced bit or stash block index if in actual sync.
1043  // Synced state is only written out to cinfo file when data file is synced.
1044  if (m_in_sync)
1045  {
1046  m_writes_during_sync.push_back(blk_idx);
1047  }
1048  else
1049  {
1050  m_cfi.SetBitSynced(blk_idx);
1051  ++m_non_flushed_cnt;
1052  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1053  ! m_in_shutdown)
1054  {
1055  schedule_sync = true;
1056  m_in_sync = true;
1057  m_non_flushed_cnt = 0;
1058  }
1059  }
1060  }
1061 
1062  if (schedule_sync)
1063  {
1064  cache()->ScheduleFileSync(this);
1065  }
1066 }
1067 
1068 //------------------------------------------------------------------------------
1069 
1071 {
1072  TRACEF(Dump, "Sync()");
1073 
1074  int ret = m_data_file->Fsync();
1075  bool errorp = false;
1076  if (ret == XrdOssOK)
1077  {
1078  Stats loc_stats = m_stats.Clone();
1079  m_cfi.WriteIOStat(loc_stats);
1080  m_cfi.Write(m_info_file, m_filename.c_str());
1081  int cret = m_info_file->Fsync();
1082  if (cret != XrdOssOK)
1083  {
1084  TRACEF(Error, "Sync cinfo file sync error " << cret);
1085  errorp = true;
1086  }
1087  }
1088  else
1089  {
1090  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1091  errorp = true;
1092  }
1093 
1094  if (errorp)
1095  {
1096  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1097 
1098  // Unlink will also call this->initiate_emergency_shutdown()
1099  Cache::GetInstance().UnlinkFile(m_filename, false);
1100 
1101  XrdSysCondVarHelper _lck(&m_state_cond);
1102 
1103  m_writes_during_sync.clear();
1104  m_in_sync = false;
1105 
1106  return;
1107  }
1108 
1109  int written_while_in_sync;
1110  bool resync = false;
1111  {
1112  XrdSysCondVarHelper _lck(&m_state_cond);
1113  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1114  {
1115  m_cfi.SetBitSynced(*i);
1116  }
1117  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1118  m_writes_during_sync.clear();
1119 
1120  // If there were writes during sync and the file is now complete,
1121  // let us call Sync again without resetting the m_in_sync flag.
1122  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1123  resync = true;
1124  else
1125  m_in_sync = false;
1126  }
1127  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1128 
1129  if (resync)
1130  Sync();
1131 }
1132 
1133 
1134 //==============================================================================
1135 // Block processing
1136 //==============================================================================
1137 
1138 void File::free_block(Block* b)
1139 {
1140  // Method always called under lock.
1141  int i = b->m_offset / m_block_size;
1142  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1143  size_t ret = m_block_map.erase(i);
1144  if (ret != 1)
1145  {
1146  // assert might be a better option than a warning
1147  TRACEF(Error, "free_block did not erase " << i << " from map");
1148  }
1149  else
1150  {
1151  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1152  delete b;
1153  }
1154 
1155  if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1156  {
1157  m_prefetch_state = kOn;
1158  cache()->RegisterPrefetchFile(this);
1159  }
1160 }
1161 
1162 //------------------------------------------------------------------------------
1163 
1164 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1165 {
1166  // Method always called under lock. It also expects prefetch to be active.
1167 
1168  int io_size = (int) m_io_set.size();
1169  bool io_ok = false;
1170 
1171  if (io_size == 1)
1172  {
1173  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1174  if (io_ok)
1175  {
1176  m_current_io = m_io_set.begin();
1177  }
1178  }
1179  else if (io_size > 1)
1180  {
1181  IoSet_i mi = m_current_io;
1182  if (skip_current && mi != m_io_set.end()) ++mi;
1183 
1184  for (int i = 0; i < io_size; ++i)
1185  {
1186  if (mi == m_io_set.end()) mi = m_io_set.begin();
1187 
1188  if ((*mi)->m_allow_prefetching)
1189  {
1190  m_current_io = mi;
1191  io_ok = true;
1192  break;
1193  }
1194  ++mi;
1195  }
1196  }
1197 
1198  if ( ! io_ok)
1199  {
1200  m_current_io = m_io_set.end();
1201  m_prefetch_state = kStopped;
1202  cache()->DeRegisterPrefetchFile(this);
1203  }
1204 
1205  return io_ok;
1206 }
1207 
1208 //------------------------------------------------------------------------------
1209 
1210 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1211 {
1212  // Called from DirectResponseHandler.
1213  // NOT under lock.
1214 
1215  if (error_cond)
1216  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1217 
1218  m_state_cond.Lock();
1219 
1220  if (error_cond)
1221  rreq->update_error_cond(error_cond);
1222  else {
1223  rreq->m_stats.m_BytesBypassed += bytes_read;
1224  rreq->m_bytes_read += bytes_read;
1225  }
1226 
1227  rreq->m_direct_done = true;
1228 
1229  bool rreq_complete = rreq->is_complete();
1230 
1231  m_state_cond.UnLock();
1232 
1233  if (rreq_complete)
1234  FinalizeReadRequest(rreq);
1235 }
1236 
1237 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1238 {
1239  // Called from ProcessBlockResponse().
1240  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1241  // Does not manage m_read_req.
1242  // Will not complete the request.
1243 
1244  TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1245  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1246 
1247  rreq->update_error_cond(b->get_error());
1248  --rreq->m_n_chunk_reqs;
1249 
1250  dec_ref_count(b);
1251 }
1252 
1253 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1254 {
1255  // Called from ProcessBlockResponse().
1256  // NOT under lock as it does memcopy ofor exisf block data.
1257  // Acquires lock for block, m_read_req and rreq state update.
1258 
1259  ReadRequest *rreq = creq.m_read_req;
1260 
1261  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1262  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1263 
1264  m_state_cond.Lock();
1265 
1266  rreq->m_bytes_read += creq.m_size;
1267 
1268  if (b->get_req_id() == (void*) rreq)
1269  rreq->m_stats.m_BytesMissed += creq.m_size;
1270  else
1271  rreq->m_stats.m_BytesHit += creq.m_size;
1272 
1273  --rreq->m_n_chunk_reqs;
1274 
1275  if (b->m_prefetch)
1276  inc_prefetch_hit_cnt(1);
1277 
1278  dec_ref_count(b);
1279 
1280  bool rreq_complete = rreq->is_complete();
1281 
1282  m_state_cond.UnLock();
1283 
1284  if (rreq_complete)
1285  FinalizeReadRequest(rreq);
1286 }
1287 
1288 void File::FinalizeReadRequest(ReadRequest *rreq)
1289 {
1290  // called from ProcessBlockResponse()
1291  // NOT under lock -- does callout
1292 
1293  m_stats.AddReadStats(rreq->m_stats);
1294 
1295  rreq->m_rh->Done(rreq->return_value());
1296  delete rreq;
1297 }
1298 
1299 void File::ProcessBlockResponse(Block *b, int res)
1300 {
1301  static const char* tpfx = "ProcessBlockResponse ";
1302 
1303  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1304 
1305  if (res >= 0 && res != b->get_size())
1306  {
1307  // Incorrect number of bytes received, apparently size of the file on the remote
1308  // is different than what the cache expects it to be.
1309  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1310  Cache::GetInstance().UnlinkFile(m_filename, false);
1311  }
1312 
1313  m_state_cond.Lock();
1314 
1315  // Deregister block from IO's prefetch count, if needed.
1316  if (b->m_prefetch)
1317  {
1318  IO *io = b->get_io();
1319  IoSet_i mi = m_io_set.find(io);
1320  if (mi != m_io_set.end())
1321  {
1322  --io->m_active_prefetches;
1323 
1324  // If failed and IO is still prefetching -- disable prefetching on this IO.
1325  if (res < 0 && io->m_allow_prefetching)
1326  {
1327  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1328  io->m_allow_prefetching = false;
1329 
1330  // Check if any IO is still available for prfetching. If not, stop it.
1331  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1332  {
1333  if ( ! select_current_io_or_disable_prefetching(false) )
1334  {
1335  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1336  }
1337  }
1338  }
1339 
1340  // If failed with no subscribers -- delete the block and exit.
1341  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1342  {
1343  free_block(b);
1344  m_state_cond.UnLock();
1345  return;
1346  }
1347  m_prefetch_bytes += b->get_size();
1348  }
1349  else
1350  {
1351  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1352  }
1353  }
1354 
1355  if (res == b->get_size())
1356  {
1357  b->set_downloaded();
1358  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1359  if ( ! m_in_shutdown)
1360  {
1361  // Increase ref-count for the writer.
1362  inc_ref_count(b);
1363  m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1364  cache()->AddWriteTask(b, true);
1365  }
1366 
1367  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1368  vChunkRequest_t creqs_to_notify;
1369  creqs_to_notify.swap( b->m_chunk_reqs );
1370 
1371  m_state_cond.UnLock();
1372 
1373  for (auto &creq : creqs_to_notify)
1374  {
1375  ProcessBlockSuccess(b, creq);
1376  }
1377  }
1378  else
1379  {
1380  if (res < 0) {
1381  bool new_error = b->get_io()->register_block_error(res);
1382  int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1383  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1384  << ", io=" << b->get_io() << ", error=" << res);
1385  } else {
1386  bool first_p = b->get_io()->register_incomplete_read();
1387  int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1388  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1389  << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1390 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1391  res = -EIO;
1392 #else
1393  res = -EREMOTEIO;
1394 #endif
1395  }
1396  b->set_error(res);
1397 
1398  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1399  // Collect others with a different IO, the first of them will be used to reissue the request.
1400  // This is then done outside of lock.
1401  std::list<ReadRequest*> rreqs_to_complete;
1402  vChunkRequest_t creqs_to_keep;
1403 
1404  for(ChunkRequest &creq : b->m_chunk_reqs)
1405  {
1406  ReadRequest *rreq = creq.m_read_req;
1407 
1408  if (rreq->m_io == b->get_io())
1409  {
1410  ProcessBlockError(b, rreq);
1411  if (rreq->is_complete())
1412  {
1413  rreqs_to_complete.push_back(rreq);
1414  }
1415  }
1416  else
1417  {
1418  creqs_to_keep.push_back(creq);
1419  }
1420  }
1421 
1422  bool reissue = false;
1423  if ( ! creqs_to_keep.empty())
1424  {
1425  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1426 
1427  TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1428  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1429 
1430  b->reset_error_and_set_io(rreq->m_io, rreq);
1431  b->m_chunk_reqs.swap( creqs_to_keep );
1432  reissue = true;
1433  }
1434 
1435  m_state_cond.UnLock();
1436 
1437  for (auto rreq : rreqs_to_complete)
1438  FinalizeReadRequest(rreq);
1439 
1440  if (reissue)
1441  ProcessBlockRequest(b);
1442  }
1443 }
1444 
1445 //------------------------------------------------------------------------------
1446 
1447 const char* File::lPath() const
1448 {
1449  return m_filename.c_str();
1450 }
1451 
1452 //------------------------------------------------------------------------------
1453 
1454 int File::offsetIdx(int iIdx) const
1455 {
1456  return iIdx - m_offset/m_block_size;
1457 }
1458 
1459 
1460 //------------------------------------------------------------------------------
1461 
1463 {
1464  // Check that block is not on disk and not in RAM.
1465  // TODO: Could prefetch several blocks at once!
1466  // blks_max could be an argument
1467 
1468  BlockList_t blks;
1469 
1470  TRACEF(DumpXL, "Prefetch() entering.");
1471  {
1472  XrdSysCondVarHelper _lck(m_state_cond);
1473 
1474  if (m_prefetch_state != kOn)
1475  {
1476  return;
1477  }
1478 
1479  if ( ! select_current_io_or_disable_prefetching(true) )
1480  {
1481  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1482  return;
1483  }
1484 
1485  // Select block(s) to fetch.
1486  for (int f = 0; f < m_num_blocks; ++f)
1487  {
1488  if ( ! m_cfi.TestBitWritten(f))
1489  {
1490  int f_act = f + m_offset / m_block_size;
1491 
1492  BlockMap_i bi = m_block_map.find(f_act);
1493  if (bi == m_block_map.end())
1494  {
1495  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1496  if (b)
1497  {
1498  TRACEF(Dump, "Prefetch take block " << f_act);
1499  blks.push_back(b);
1500  // Note: block ref_cnt not increased, it will be when placed into write queue.
1501 
1502  inc_prefetch_read_cnt(1);
1503  }
1504  else
1505  {
1506  // This shouldn't happen as prefetching stops when RAM is 70% full.
1507  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1508  }
1509  break;
1510  }
1511  }
1512  }
1513 
1514  if (blks.empty())
1515  {
1516  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1517  m_prefetch_state = kComplete;
1518  cache()->DeRegisterPrefetchFile(this);
1519  }
1520  else
1521  {
1522  (*m_current_io)->m_active_prefetches += (int) blks.size();
1523  }
1524  }
1525 
1526  if ( ! blks.empty())
1527  {
1528  ProcessBlockRequests(blks);
1529  }
1530 }
1531 
1532 
1533 //------------------------------------------------------------------------------
1534 
1536 {
1537  return m_prefetch_score;
1538 }
1539 
1541 {
1542  return Cache::GetInstance().GetLog();
1543 }
1544 
1546 {
1547  return Cache::GetInstance().GetTrace();
1548 }
1549 
1550 void File::insert_remote_location(const std::string &loc)
1551 {
1552  if ( ! loc.empty())
1553  {
1554  size_t p = loc.find_first_of('@');
1555  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1556  }
1557 }
1558 
1559 std::string File::GetRemoteLocations() const
1560 {
1561  std::string s;
1562  if ( ! m_remote_locations.empty())
1563  {
1564  size_t sl = 0;
1565  int nl = 0;
1566  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1567  {
1568  sl += i->size();
1569  }
1570  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1571  s = '[';
1572  int j = 1;
1573  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1574  {
1575  s += '"'; s += *i; s += '"';
1576  if (j < nl) s += ',';
1577  }
1578  s += ']';
1579  }
1580  else
1581  {
1582  s = "[]";
1583  }
1584  return s;
1585 }
1586 
1587 //==============================================================================
1588 //======================= RESPONSE HANDLERS ==============================
1589 //==============================================================================
1590 
1592 {
1593  m_block->m_file->ProcessBlockResponse(m_block, res);
1594  delete this;
1595 }
1596 
1597 //------------------------------------------------------------------------------
1598 
1600 {
1601  m_mutex.Lock();
1602 
1603  int n_left = --m_to_wait;
1604 
1605  if (res < 0) {
1606  if (m_errno == 0) m_errno = res; // store first reported error
1607  } else {
1608  m_bytes_read += res;
1609  }
1610 
1611  m_mutex.UnLock();
1612 
1613  if (n_left == 0)
1614  {
1615  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1616  delete this;
1617  }
1618 }
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:67
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
#define TRACEF_INT(act, x)
Definition: XrdPfcTrace.hh:71
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
bool Debug
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1591
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:174
int get_size() const
Definition: XrdPfcFile.hh:147
int get_error() const
Definition: XrdPfcFile.hh:161
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:173
void * get_req_id() const
Definition: XrdPfcFile.hh:153
long long get_offset() const
Definition: XrdPfcFile.hh:149
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:136
void set_error(int err)
Definition: XrdPfcFile.hh:160
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:172
char * get_buff() const
Definition: XrdPfcFile.hh:146
IO * get_io() const
Definition: XrdPfcFile.hh:152
void set_downloaded()
Definition: XrdPfcFile.hh:159
bool req_cksum_net() const
Definition: XrdPfcFile.hh:170
bool has_cksums() const
Definition: XrdPfcFile.hh:171
long long m_offset
Definition: XrdPfcFile.hh:125
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:163
int get_req_size() const
Definition: XrdPfcFile.hh:148
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
XrdOss * GetOss() const
Definition: XrdPfc.hh:389
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:402
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:319
XrdSysError * GetLog()
Definition: XrdPfc.hh:401
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:163
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1171
void Done(int result) override
Definition: XrdPfcFile.cc:1599
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:272
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1447
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:709
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1545
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:987
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:100
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1535
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:215
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1540
void Prefetch()
Definition: XrdPfcFile.cc:1462
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1559
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:491
void AddIO(IO *io)
Definition: XrdPfcFile.cc:296
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:266
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:166
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:216
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:113
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1070
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:676
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:180
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:333
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:275
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:158
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:143
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:189
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
bool register_incomplete_read()
Definition: XrdPfcIO.hh:92
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:30
bool register_block_error(int res)
Definition: XrdPfcIO.hh:95
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:72
const char * GetLocation()
Definition: XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:369
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:391
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:305
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:290
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:422
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:215
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:299
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:294
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:361
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:380
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:451
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:293
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:356
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:473
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:431
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:424
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:347
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:298
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:306
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:440
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:441
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
void IoAttach()
Definition: XrdPfcStats.hh:83
Stats Clone()
Definition: XrdPfcStats.hh:97
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:59
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:37
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:75
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:68
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:35
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
void DeltaToReference(const Stats &ref)
Definition: XrdPfcStats.hh:106
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:112
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:177
XrdSysTrace * GetTrace()
Definition: XrdPfcPurge.cc:16
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:178
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:102
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:109
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
unsigned short m_seq_id
Definition: XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:92
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:77
bool is_complete() const
Definition: XrdPfcFile.hh:94
int return_value() const
Definition: XrdPfcFile.hh:95
long long m_bytes_read
Definition: XrdPfcFile.hh:79