Skip to content

Commit

Permalink
XrdApps::JCache: add cleaner thread functionality which is enabled in…
Browse files Browse the repository at this point in the history
… the plug-in using 'size=<bytes>'.
  • Loading branch information
apeters1971 committed Jun 11, 2024
1 parent 44fbdec commit d0e22c7
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 4 deletions.
136 changes: 136 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//------------------------------------------------------------------------------
// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
// Author: Andreas-Joachim Peters <[email protected]>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

/*----------------------------------------------------------------------------*/
#include "cleaner/Cleaner.hh"
#include <sys/statfs.h>
/*----------------------------------------------------------------------------*/

namespace fs = std::filesystem;
namespace JCache {
/*----------------------------------------------------------------------------*/

//----------------------------------------------------------------------------
// @brief getLastAccessTime() returns the last access time of a file.
//
// @param filePath - path to the file
// @return last access time of the file in seconds
//----------------------------------------------------------------------------
time_t Cleaner::getLastAccessTime(const fs::path &filePath) {
struct stat fileInfo;
if (stat(filePath.c_str(), &fileInfo) != 0) {
return -1; // Error occurred
}
return fileInfo.st_atime;
}

//----------------------------------------------------------------------------
// @brief getDirectorySize() returns the total size of all files in a
// directory.
//
// @param directory - path to the directory
// @return total size of all files in the directory in bytes
//----------------------------------------------------------------------------
long long Cleaner::getDirectorySize(const fs::path &directory, bool scan) {

long long totalSize = 0;
if (scan) {
for (const auto &entry : fs::recursive_directory_iterator(directory)) {
if (stopFlag.load()) {
return 0;
}
if (fs::is_regular_file(entry)) {
totalSize += fs::file_size(entry);
}
}
} else {
struct statfs stat;

if (statfs(directory.c_str(), &stat) != 0) {
mLog->Error(1,"JCache:Cleaner: failed to get directory size using statfs.");
return 0;
}
}
return totalSize;
}

//----------------------------------------------------------------------------
// @brief getFilesByAccessTime() returns a list of files sorted by their
// last access time.
//
// @param directory - path to the directory
// @return list of files sorted by their last access time
//----------------------------------------------------------------------------
std::vector<std::pair<long long, fs::path>>
Cleaner::getFilesByAccessTime(const fs::path &directory) {
std::vector<std::pair<long long, fs::path>> fileList;
for (const auto &entry : fs::recursive_directory_iterator(directory)) {
if (fs::is_regular_file(entry)) {
auto accessTime = getLastAccessTime(entry.path());
fileList.emplace_back(accessTime, entry.path());
}
}
std::sort(fileList.begin(), fileList.end());
return fileList;
}

//----------------------------------------------------------------------------
// @brief cleanDirectory() deletes files from a directory that are older than
// a given threshold.
//
// @param directory - path to the directory
// @param highWatermark - threshold for high watermark in bytes
// @param lowWatermark - threshold for low watermark in bytes
// @return none
//----------------------------------------------------------------------------
void Cleaner::cleanDirectory(const fs::path &directory, long long highWatermark,
long long lowWatermark) {
long long currentSize = getDirectorySize(directory);
if (currentSize <= highWatermark) {
/*----------------------------------------------------------------------------*/
mLog->Info(1,"JCache:Cleaner: Directory size is within the limit (%lu/%lu). No action needed.",
currentSize, highWatermark);
return;
}

auto files = getFilesByAccessTime(directory);

for (const auto &[accessTime, filePath] : files) {
if (stopFlag.load()) {
return;
}
if (currentSize <= lowWatermark) {
break;
}
long long fileSize = fs::file_size(filePath);
try {
fs::remove(filePath);
currentSize -= fileSize;
mLog->Info(1, "JCache:Cleaner : deleted '%s' (Size: %lu bytes)", filePath.c_str(),
fileSize);
} catch (const std::exception &e) {
mLog->Error(1,"JCache::Cleaner error deleting '%'", filePath.c_str());
}
}
}
} // namespace JCache
145 changes: 145 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//------------------------------------------------------------------------------
// Copyright (c) 2024 by European Organization for Nuclear Research (CERN)
// Author: Andreas-Joachim Peters <[email protected]>
//------------------------------------------------------------------------------
// This file is part of the XRootD software suite.
//
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//
// In applying this licence, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#pragma once
/*----------------------------------------------------------------------------*/
#include <algorithm>
#include <atomic>
#include <chrono>
#include <ctime>
#include <filesystem>
#include <iomanip>
#include <iostream>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
#include <vector>
/*----------------------------------------------------------------------------*/
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdCl/XrdClLog.hh"
/*----------------------------------------------------------------------------*/
namespace fs = std::filesystem;

namespace JCache {}
namespace JCache {
class Cleaner {
public:
// Constructor to initialize the configuration variables
Cleaner(double lowWatermark, double highWatermark, const std::string &path,
bool scan, size_t interval)
: lowWatermark(lowWatermark), highWatermark(highWatermark), subtree(path),
scan(scan), interval(interval), stopFlag(false) {
mLog = XrdCl::DefaultEnv::GetLog();
}

Cleaner()
: lowWatermark(0), highWatermark(0), subtree(""),
scan(true), interval(60), stopFlag(false) {
mLog = XrdCl::DefaultEnv::GetLog();
}

// Method to start the cleaning process in a separate thread
void run() {
stopFlag = false;
cleanerThread = std::thread(&Cleaner::cleanLoop, this);
cleanerThread.detach();
}

// Method to stop the cleaning process and join the thread
void stop() {
stopFlag = true;
if (cleanerThread.joinable()) {
cleanerThread.join();
}
}

// Destructor to ensure the thread is properly joined
~Cleaner() { stop(); }

// Method to Define Cleaner size
void SetSize(uint64_t size, const std::string& path) {
stop();
if (size > 1024ll*1024ll*1024ll) {
subtree = path;
highWatermark = size;
lowWatermark = size * 0.9;
run();
} else {
mLog->Error(1, "JCache:Cleaner : the size given to the cleaner is less than 1GB - cleaning is disabled!");
}
}

// Method to set the scan option (true means scan, false means don't scan but use statfs!)
void SetScan(bool sc) {
scan = sc;
}

private:
// Private methods
time_t getLastAccessTime(const fs::path &filePath);
long long getDirectorySize(const fs::path &directory, bool scan = true);
std::vector<std::pair<long long, fs::path>>
getFilesByAccessTime(const fs::path &directory);
void cleanDirectory(const fs::path &directory, long long highWatermark,
long long lowWatermark);

// Configuration variables
double lowWatermark;
double highWatermark;
fs::path subtree;
std::atomic<bool> scan;
size_t interval;

// Thread-related variables
std::thread cleanerThread;
std::atomic<bool> stopFlag;

// XRootD logger
XrdCl::Log *mLog;

// The method that runs in a loop, calling the clean method every interval
// seconds
void cleanLoop() {

while (!stopFlag.load()) {
auto start = std::chrono::steady_clock::now();

cleanDirectory(subtree, highWatermark, lowWatermark);

auto end = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(end - start);

if ( (size_t)duration.count() < interval) {
auto s = std::chrono::seconds(interval) - duration;
std::this_thread::sleep_for(s);
}
}
}

// The clean method to be called periodically
void clean() {}
};
} // namespace JCache
1 change: 1 addition & 0 deletions src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct CacheStats {
JCache::Art art;
if (XrdCl::JCacheFile::sEnableSummary) {
std::cerr << "# IO Timeprofile " << std::endl;
std::cerr << "# --------------" << std::endl;
art.drawCurve(bins,
XrdCl::JCacheFile::sStats.bench.GetTimePerBin().count() /
1000000.0,
Expand Down
2 changes: 1 addition & 1 deletion src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool XrdCl::JCacheFile::sEnableJournalCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = false;
bool XrdCl::JCacheFile::sEnableSummary = true;
JCache::CacheStats XrdCl::JCacheFile::sStats(true);

JCache::Cleaner XrdCl::JCacheFile::sCleaner;
JournalManager XrdCl::JCacheFile::sJournalManager;

namespace XrdCl {
Expand Down
5 changes: 5 additions & 0 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "XrdCl/XrdClPlugInInterface.hh"
/*----------------------------------------------------------------------------*/
#include "cache/Journal.hh"
#include "cleaner/Cleaner.hh"
#include "file/Art.hh"
#include "file/TimeBench.hh"
#include "handler/XrdClJCachePgReadHandler.hh"
Expand Down Expand Up @@ -220,6 +221,7 @@ public:
static void SetVector(const bool &value) { sEnableVectorCache = value; }
static void SetJsonPath(const std::string &path) { sJsonPath = path; }
static void SetSummary(const bool &value) { sEnableSummary = value; }
static void SetSize(uint64_t size) { sCleaner.SetSize(size,sCachePath);}

//----------------------------------------------------------------------------
//! @brief static members pointing to cache settings
Expand All @@ -239,6 +241,9 @@ public:
//! @brief global plugin cache hit statistics
static JCache::CacheStats sStats;

//! @brief cleaner instance
static JCache::Cleaner sCleaner;

private:
//! @brief attach for read
bool AttachForRead();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class JCachePgReadHandler : public XrdCl::ResponseHandler
public:
JCachePgReadHandler() {}

JCachePgReadHandler(JCacheReadHandler *other) {
JCachePgReadHandler(JCachePgReadHandler *other) {
rbytes = other->rbytes;
journal = other->journal;
}
Expand Down
13 changes: 11 additions & 2 deletions src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ public:
if (config) {
auto itc = config->find("cache");
JCacheFile::SetCache(itc != config->end() ? itc->second : "");

auto itsz = config->find("size");
JCacheFile::SetSize(itsz != config->end() ? std::stoll(std::string(itsz->second),0,10) : 0);

auto itv = config->find("vector");
JCacheFile::SetVector(itv != config->end() ? itv->second == "true"
: false);
auto itj = config->find("journal");
JCacheFile::SetJournal(itj != config->end() ? itj->second == "true"
: false);
JCacheFile::SetJournal(itj != config->end() ? itj->second == "false"
: true);
auto itjson = config->find("json");
JCacheFile::SetJsonPath(itjson != config->end() ? itjson->second : "./");

auto its = config->find("summary");
JCacheFile::SetSummary(its != config->end() ? its->second != "false"
: true);
Expand All @@ -65,6 +70,10 @@ public:
JCacheFile::SetCache((std::string(v).length()) ? std::string(v) : "");
}

if (const char *v = getenv("XRD_JCACHE_SIZE")) {
JCacheFile::SetSize((std::string(v).length()) ? std::stoll(std::string(v),0,10) : 0);
}

if (const char *v = getenv("XRD_JCACHE_SUMMARY")) {
JCacheFile::SetSummary((std::string(v) == "true") ? true : false);
}
Expand Down

0 comments on commit d0e22c7

Please sign in to comment.