Skip to content

Commit

Permalink
XrdApps::JCache: make JSON dump and Summary output configurable via
Browse files Browse the repository at this point in the history
config file - add max rate to Summary and put more bins in the
timeseries
  • Loading branch information
Andreas Joachim Peters committed Jun 10, 2024
1 parent bdc3609 commit 0d14b81
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 26 deletions.
18 changes: 11 additions & 7 deletions src/XrdApps/XrdClJCachePlugin/file/Art.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public:
virtual ~Art() {}

void drawCurve(const std::vector<double>& dataPoints, double runtime) {
if (dataPoints.size() != 10) {
std::cerr << "Error: Exactly 10 data points are required." << std::endl;
if (dataPoints.size() != 40) {
std::cerr << "Error: Exactly 40 data points are required." << std::endl;
return;
}

Expand Down Expand Up @@ -69,16 +69,20 @@ public:
for (int i = 0; i < plotHeight; ++i) {
double yValue = minValue + (maxValue - minValue) * (plotHeight - 1 - i) / (plotHeight - 1);
if (i==0) {
std::cout << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " MB/s | ";
std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " MB/s | ";
} else {
std::cout << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " | ";
std::cerr << std::setw(yLegendWidth) << std::fixed << std::setprecision(2) << yValue << " | ";
}
std::cout << plot[i] << std::endl;
std::cerr << plot[i] << std::endl;
}

// Print the X axis
std::cout << std::string(yLegendWidth + 7, ' ') << std::string(plotWidth, '-') << std::endl;
std::cout << std::string(yLegendWidth + 7, ' ') << " 0 1 2 3 4 5 6 7 8 9 [ 10 :=" << std::fixed << std::setprecision(2) << runtime << "s ]"<< std::endl;
std::cerr << std::string(yLegendWidth + 7, ' ') << std::string(plotWidth, '-') << std::endl;
std::cerr << std::string(yLegendWidth + 7, ' ');
for (size_t i = 0 ; i < dataPoints.size()/4; ++i) {
std::cerr << std::fixed << std::setw(4) << std::left << (i*10);
}
std::cerr << "[ " << 100 << " % = " << std::fixed << std::setprecision(2) << runtime << "s ]"<< std::endl;
}

void drawCurve(const std::vector<long unsigned int>& data, double interval, double runtime) {
Expand Down
19 changes: 9 additions & 10 deletions src/XrdApps/XrdClJCachePlugin/file/TimeBench.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ private:
TimePoint start;
TimePoint end;
uint64_t totalBytes;
size_t nbins;
std::mutex mtx;

public:
TimeBench() : totalBytes(0) {}
TimeBench() : totalBytes(0), nbins(10) {}

void AddMeasurement(uint64_t bytes) {
std::lock_guard<std::mutex> guard(mtx);
Expand All @@ -55,21 +56,19 @@ public:
end = now;
}

std::vector<uint64_t> GetBins() {
std::vector<uint64_t> GetBins(size_t bin = 10) {
std::lock_guard<std::mutex> guard(mtx);
nbins = bin?bin:1;
Duration totalTime = std::chrono::duration_cast<Duration>(end - start);
Duration binSize = totalTime / 10;
Duration binSize = totalTime / nbins;
bins.clear();
bins.resize(10, 0);
bins.resize(nbins, 0);

TimePoint binStart = start;
size_t binIndex = 0;

for (auto i : measurements) {
TimePoint measurementTime = binStart + binSize;

binIndex = (i.first - start)/ binSize;
if (Clock::now() >= measurementTime) {
if (binIndex < nbins) {
bins[binIndex] += i.second;
} else {
break; // Don't process future measurements
Expand All @@ -78,10 +77,10 @@ public:

return bins;
}

Duration GetTimePerBin() {
Duration totalTime = std::chrono::duration_cast<Duration>(end - start);
Duration binSize = totalTime / 10;
Duration binSize = totalTime / nbins;
return binSize;
}
};
4 changes: 3 additions & 1 deletion src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
/*----------------------------------------------------------------------------*/

std::string XrdCl::JCacheFile::sCachePath="";
std::string XrdCl::JCacheFile::sJsonPath="./";
bool XrdCl::JCacheFile::sEnableJournalCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = true;
bool XrdCl::JCacheFile::sEnableVectorCache = false;
bool XrdCl::JCacheFile::sEnableSummary = true;
XrdCl::JCacheFile::CacheStats XrdCl::JCacheFile::sStats(true);

JournalManager XrdCl::JCacheFile::sJournalManager;
Expand Down
82 changes: 76 additions & 6 deletions src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,17 @@ public:
static void SetCache(const std::string& path) { sCachePath = path; }
static void SetJournal(const bool& value) { sEnableJournalCache = value; }
static void SetVector(const bool& value) { sEnableVectorCache = value; }
static void SetJsonPath(const std::string& path) { sJsonPath = path; }
static void SetSummary(const bool& value) { sEnableSummary = value; }

//----------------------------------------------------------------------------
//! @brief static members pointing to cache settings
//----------------------------------------------------------------------------
static std::string sCachePath;
static std::string sJsonPath;
static bool sEnableVectorCache;
static bool sEnableJournalCache;
static bool sEnableSummary;
static JournalManager sJournalManager;

//----------------------------------------------------------------------------
Expand Down Expand Up @@ -294,7 +298,6 @@ public:
}

static std::string GlobalStats() {
sStats.GetTimes();
std::ostringstream oss;
oss << "# ----------------------------------------------------------- #" << std::endl;
oss << "# JCache : cache combined hit rate : " << std::fixed << std::setprecision(2) << sStats.CombinedHitRate() << " %" << std::endl;
Expand All @@ -319,7 +322,7 @@ public:
oss << "# JCache : app real time : " << std::fixed << std::setprecision(2) << sStats.realTime << " s" << std::endl;
oss << "# JCache : app sys time : " << std::fixed << std::setprecision(2) << sStats.sysTime << " s" << std::endl;
oss << "# JCache : app acceleration : " << std::fixed << std::setprecision(2) << sStats.userTime / sStats.realTime << "x" << std::endl;
oss << "# JCache : app readrate : " << std::fixed << std::setprecision(2) << sStats.bytesToHumanReadable((sStats.ReadBytes()/sStats.realTime)) << "/s" << std::endl;
oss << "# JCache : app readrate : " << std::fixed << std::setprecision(2) << sStats.bytesToHumanReadable((sStats.ReadBytes()/sStats.realTime)) << "/s" << " [ peak " << sStats.bytesToHumanReadable(sStats.peakrate) << "/s" << std::endl;
oss << "# ----------------------------------------------------------- #" << std::endl;

return oss.str();
Expand All @@ -335,7 +338,8 @@ public:
readVOps(0),
readVreadOps(0),
nreadfiles(0),
dumponexit(doe)
dumponexit(doe),
peakrate(0)
{
// Get the current real time
struct timeval now;
Expand All @@ -345,11 +349,26 @@ public:

~CacheStats() {
if (dumponexit.load()) {
std::cerr << GlobalStats();
using namespace std::chrono;
std::vector<uint64_t> bins = sStats.bench.GetBins();
std::string jsonpath = sJsonPath + "jcache.";
std::string name = getenv("XRD_APPNAME")?getenv("XRD_APPNAME"):"none"+std::string(".")+std::to_string(getpid());
jsonpath += name;
jsonpath += ".json";
sStats.GetTimes();

sStats.bytes_per_second = sStats.bench.GetBins((int)(realTime));
sStats.peakrate = *(std::max_element(sStats.bytes_per_second.begin(), sStats.bytes_per_second.end()));
if (sJsonPath.length()) {
sStats.persistToJson(jsonpath, name);
}
if (sEnableSummary) {
std::cerr << GlobalStats();
}
std::vector<uint64_t> bins = sStats.bench.GetBins(40);
Art art;
art.drawCurve(bins, sStats.bench.GetTimePerBin().count() / 1000000.0, realTime);
if (sEnableSummary) {
art.drawCurve(bins, sStats.bench.GetTimePerBin().count() / 1000000.0, realTime);
}
}
}

Expand Down Expand Up @@ -422,6 +441,55 @@ public:
sysTime = usage.ru_stime.tv_sec + usage.ru_stime.tv_usec / 1000000.0;
}

void persistToJson(const std::string& path, const std::string& name) {
std::ofstream outFile(path);
if (!outFile.is_open()) {
std::cerr << "error: failed to open JSON statistics file: " << path << std::endl;
return;
}

outFile << "{\n";
outFile << " \"appname\": \"" << name << "\",\n";
outFile << " \"bytesRead\": " << bytesRead.load() << ",\n";
outFile << " \"bytesReadV\": " << bytesReadV.load() << ",\n";
outFile << " \"bytesCached\": " << bytesCached.load() << ",\n";
outFile << " \"bytesCachedV\": " << bytesCachedV.load() << ",\n";
outFile << " \"readOps\": " << readOps.load() << ",\n";
outFile << " \"readVOps\": " << readVOps.load() << ",\n";
outFile << " \"readVreadOps\": " << readVreadOps.load() << ",\n";
outFile << " \"nreadfiles\": " << nreadfiles.load() << ",\n";
outFile << " \"totaldatasize\": " << totaldatasize.load() << ",\n";

std::lock_guard<std::mutex> lock(urlMutex);
outFile << " \"urls\": [";
for (auto it = urls.begin(); it != urls.end(); ++it) {
if (it != urls.begin()) {
outFile << ", ";
}
outFile << "\"" << *it << "\"";
}
outFile << "],\n";

outFile << " \"bytes_per_second\": [";
for (size_t i = 0; i < bytes_per_second.size(); ++i) {
if (i != 0) {
outFile << ", ";
}
outFile << bytes_per_second[i];
}
outFile << "],\n";

outFile << std::fixed << std::setprecision(6); // Set precision for double values

outFile << " \"userTime\": " << userTime.load() << ",\n";
outFile << " \"realTime\": " << realTime.load() << ",\n";
outFile << " \"sysTime\": " << sysTime.load() << ",\n";
outFile << " \"startTime\": " << startTime.load() << "\n";
outFile << "}\n";

outFile.close();
}

std::atomic<uint64_t> bytesRead;
std::atomic<uint64_t> bytesReadV;
std::atomic<uint64_t> bytesCached;
Expand All @@ -439,6 +507,8 @@ public:
std::atomic<double> sysTime;
std::atomic<double> startTime;
TimeBench bench;
std::vector<uint64_t> bytes_per_second;
std::atomic<double> peakrate;
};
private:

Expand Down
8 changes: 6 additions & 2 deletions src/XrdApps/XrdClJCachePlugin/plugin/XrdClJCachePlugin.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ class JCacheFactory : public PlugInFactory
{
if( config )
{
auto itc = config->find( "cache" );
JCacheFile::SetCache( itc != config->end() ? itc->second : "" );
auto itc = config->find( "cache" );
JCacheFile::SetCache( itc != config->end() ? itc->second : "" );
auto itv = config->find( "vector" );
JCacheFile::SetVector( itv != config->end() ? itv->second == "true": false );
auto itj = config->find( "journal" );
JCacheFile::SetJournal( itj != config->end() ? itj->second == "true": false );
auto itjson = config->find( "json" );
JCacheFile::SetJsonPath( itjson != config->end() ? itjson->second : "./" );
auto its = config->find( "summary" );
JCacheFile::SetSummary( its != config->end() ? its->second != "false": true );
Log* log = DefaultEnv::GetLog();
log->Info(1, "JCache : cache directory: %s", JCacheFile::sCachePath.c_str());
log->Info(1, "JCache : caching readv in vector cache : %s", JCacheFile::sEnableVectorCache ? "true" : "false");
Expand Down

0 comments on commit 0d14b81

Please sign in to comment.