Skip to content

Commit

Permalink
MINIFICPP-2082 Move RocksDB stats to RepositoryMetrics
Browse files Browse the repository at this point in the history
Signed-off-by: Ferenc Gerlits <[email protected]>
This closes apache#1540
  • Loading branch information
lordgamez authored and fgerlits committed Jul 11, 2023
1 parent 5f7e06f commit 59d48f5
Show file tree
Hide file tree
Showing 19 changed files with 283 additions and 161 deletions.
40 changes: 22 additions & 18 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ QueueMetrics is a system level metric that reports queue metrics for every conne

RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile, content, and provenance repositories)

| Metric name | Labels | Description |
|---------------------------|-----------------|-------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| Metric name | Labels | Description |
|--------------------------------------|-----------------|------------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |

| Label | Description |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
Expand Down Expand Up @@ -188,17 +190,19 @@ FlowInformation is a system level metric that reports component and queue relate

AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information.

| Metric name | Labels | Description |
|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| uptime_milliseconds | - | Agent uptime in milliseconds |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| agent_memory_usage_bytes | - | Memory used by the agent process in bytes |
| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. |
| Metric name | Labels | Description |
|--------------------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |
| uptime_milliseconds | - | Agent uptime in milliseconds |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| agent_memory_usage_bytes | - | Memory used by the agent process in bytes |
| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. |

| Label | Description |
|-----------------|----------------------------------------------------------|
Expand Down
9 changes: 7 additions & 2 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ def verify_metric_class(self, metric_class):

def verify_repository_metrics(self):
label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}, {'repository_name': 'content'}]
# Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there
return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size_bytes', 'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 'RepositoryMetrics', labels) for labels in label_list)) and \
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3]))
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) and \
all((self.verify_metrics_exist(['minifi_rocksdb_table_readers_size_bytes', 'minifi_rocksdb_all_memory_tables_size_bytes'], 'RepositoryMetrics', labels) for labels in label_list[1:3]))

def verify_queue_metrics(self):
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics')
Expand All @@ -74,12 +76,15 @@ def verify_device_info_node_metrics(self):

def verify_agent_status_metrics(self):
label_list = [{'repository_name': 'flowfile'}, {'repository_name': 'content'}]
# Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there
for labels in label_list:
if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', labels)
and self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)):
and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_rocksdb_table_readers_size_bytes', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_rocksdb_all_memory_tables_size_bytes', 'AgentStatus', labels)):
return False

# provenance repository is NoOpRepository by default which has zero size
Expand Down
9 changes: 9 additions & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ uint64_t DatabaseContentRepository::getRepositoryEntryCount() const {
})).value_or(0);
}

std::optional<RepositoryMetricsSource::RocksDbStats> DatabaseContentRepository::getRocksDbStats() const {
auto opendb = db_->open();
if (!opendb) {
return RocksDbStats{};
}

return opendb->getStats();
}

REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));

} // namespace org::apache::nifi::minifi::core::repository
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class DatabaseContentRepository : public core::ContentRepository {

uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const override;

protected:
bool removeKey(const std::string& content_path) override;
Expand Down
6 changes: 0 additions & 6 deletions extensions/rocksdb-repos/FlowFileRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
}

void FlowFileRepository::run() {
auto last = std::chrono::steady_clock::now();
while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
auto now = std::chrono::steady_clock::now();
if ((now-last) > std::chrono::seconds(30)) {
printStats();
last = now;
}
}
flush();
}
Expand Down
13 changes: 0 additions & 13 deletions extensions/rocksdb-repos/ProvenanceRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,6 @@

namespace org::apache::nifi::minifi::provenance {

void ProvenanceRepository::run() {
size_t count = 0;
while (isRunning()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
count++;
// Hack, to be removed in scope of https://issues.apache.org/jira/browse/MINIFICPP-1145
count = count % 30;
if (count == 0) {
printStats();
}
}
}

bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/ProvenanceRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ProvenanceRepository : public core::repository::RocksDbRepository {

private:
// Run function for the thread
void run() override;
void run() override {};
};

} // namespace org::apache::nifi::minifi::provenance
15 changes: 3 additions & 12 deletions extensions/rocksdb-repos/RocksDbRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,13 @@ using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::core::repository {

void RocksDbRepository::printStats() {
std::optional<RepositoryMetricsSource::RocksDbStats> RocksDbRepository::getRocksDbStats() const {
auto opendb = db_->open();
if (!opendb) {
return;
return RocksDbStats{};
}
std::string key_count;
opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);

std::string table_readers;
opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);

std::string all_memtables;
opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);

logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s",
key_count, table_readers, all_memtables);
return opendb->getStats();
}

bool RocksDbRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/RocksDbRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RocksDbRepository : public ThreadedRepository {

uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
void printStats();
std::optional<RocksDbStats> getRocksDbStats() const override;

protected:
bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
Expand Down
21 changes: 21 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,25 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
return std::nullopt;
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
std::string table_readers;
GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
try {
stats.table_readers_size = std::stoull(table_readers);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!");
}

std::string all_memtables;
GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
try {
stats.all_memory_tables_size = std::stoull(all_memtables);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!");
}

return stats;
}

} // namespace org::apache::nifi::minifi::internal
4 changes: 4 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "rocksdb/db.h"
#include "rocksdb/utilities/checkpoint.h"
#include "WriteBatch.h"
#include "core/RepositoryMetricsSource.h"
#include "core/logging/LoggerConfiguration.h"

namespace org::apache::nifi::minifi::internal {

Expand Down Expand Up @@ -73,6 +75,7 @@ class OpenRocksDb {
rocksdb::DB* get();

std::optional<uint64_t> getApproximateSizes() const;
minifi::core::RepositoryMetricsSource::RocksDbStats getStats();

private:
void handleResult(const rocksdb::Status& result);
Expand All @@ -81,6 +84,7 @@ class OpenRocksDb {
gsl::not_null<RocksDbInstance*> db_;
gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<OpenRocksDb>::getLogger()};
};

} // namespace org::apache::nifi::minifi::internal
10 changes: 10 additions & 0 deletions libminifi/include/core/RepositoryMetricsSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
#pragma once

#include <string>
#include <optional>

namespace org::apache::nifi::minifi::core {

class RepositoryMetricsSource {
public:
struct RocksDbStats {
uint64_t table_readers_size{};
uint64_t all_memory_tables_size{};
};

virtual ~RepositoryMetricsSource() = default;
virtual uint64_t getRepositorySize() const = 0;
virtual uint64_t getRepositoryEntryCount() const = 0;
Expand All @@ -40,6 +46,10 @@ class RepositoryMetricsSource {
virtual bool isRunning() const {
return true;
}

virtual std::optional<RocksDbStats> getRocksDbStats() const {
return std::nullopt;
}
};

} // namespace org::apache::nifi::minifi::core
Loading

0 comments on commit 59d48f5

Please sign in to comment.