Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2082 Move RocksDB stats to RepositoryMetrics #1540

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ QueueMetrics is a system level metric that reports queue metrics for every conne

RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile, content, and provenance repositories)

| Metric name | Labels | Description |
|---------------------------|-----------------|-------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| Metric name | Labels | Description |
|--------------------------------------|-----------------|------------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |

| Label | Description |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
Expand Down Expand Up @@ -188,17 +190,19 @@ FlowInformation is a system level metric that reports component and queue relate

AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information.

| Metric name | Labels | Description |
|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| uptime_milliseconds | - | Agent uptime in milliseconds |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| agent_memory_usage_bytes | - | Memory used by the agent process in bytes |
| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. |
| Metric name | Labels | Description |
|--------------------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------------|
| is_running | repository_name | Is the repository running (1 or 0) |
| is_full | repository_name | Is the repository full (1 or 0) |
| repository_size_bytes | repository_name | Current size of the repository |
| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) |
| repository_entry_count | repository_name | Current number of entries in the repository |
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |
| uptime_milliseconds | - | Agent uptime in milliseconds |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| agent_memory_usage_bytes | - | Memory used by the agent process in bytes |
| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. |

| Label | Description |
|-----------------|----------------------------------------------------------|
Expand Down
9 changes: 7 additions & 2 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ def verify_metric_class(self, metric_class):

def verify_repository_metrics(self):
label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}, {'repository_name': 'content'}]
# Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there
return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size_bytes', 'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 'RepositoryMetrics', labels) for labels in label_list)) and \
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3]))
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) and \
all((self.verify_metrics_exist(['minifi_rocksdb_table_readers_size_bytes', 'minifi_rocksdb_all_memory_tables_size_bytes'], 'RepositoryMetrics', labels) for labels in label_list[1:3]))

def verify_queue_metrics(self):
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics')
Expand All @@ -74,12 +76,15 @@ def verify_device_info_node_metrics(self):

def verify_agent_status_metrics(self):
label_list = [{'repository_name': 'flowfile'}, {'repository_name': 'content'}]
# Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there
for labels in label_list:
if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', labels)
and self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)):
and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_rocksdb_table_readers_size_bytes', 'AgentStatus', labels)
and self.verify_metric_exists('minifi_rocksdb_all_memory_tables_size_bytes', 'AgentStatus', labels)):
return False

# provenance repository is NoOpRepository by default which has zero size
Expand Down
9 changes: 9 additions & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,15 @@ uint64_t DatabaseContentRepository::getRepositoryEntryCount() const {
})).value_or(0);
}

std::optional<RepositoryMetricsSource::RocksDbStats> DatabaseContentRepository::getRocksDbStats() const {
auto opendb = db_->open();
if (!opendb) {
return RocksDbStats{};
}

return opendb->getStats();
}

REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));

} // namespace org::apache::nifi::minifi::core::repository
1 change: 1 addition & 0 deletions extensions/rocksdb-repos/DatabaseContentRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class DatabaseContentRepository : public core::ContentRepository {

uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const override;

protected:
bool removeKey(const std::string& content_path) override;
Expand Down
6 changes: 0 additions & 6 deletions extensions/rocksdb-repos/FlowFileRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
}

void FlowFileRepository::run() {
auto last = std::chrono::steady_clock::now();
while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
auto now = std::chrono::steady_clock::now();
if ((now-last) > std::chrono::seconds(30)) {
printStats();
last = now;
}
}
flush();
}
Expand Down
13 changes: 0 additions & 13 deletions extensions/rocksdb-repos/ProvenanceRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,6 @@

namespace org::apache::nifi::minifi::provenance {

void ProvenanceRepository::run() {
size_t count = 0;
while (isRunning()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
count++;
// Hack, to be removed in scope of https://issues.apache.org/jira/browse/MINIFICPP-1145
count = count % 30;
if (count == 0) {
printStats();
}
}
}

bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/ProvenanceRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ProvenanceRepository : public core::repository::RocksDbRepository {

private:
// Run function for the thread
void run() override;
void run() override {};
};

} // namespace org::apache::nifi::minifi::provenance
15 changes: 3 additions & 12 deletions extensions/rocksdb-repos/RocksDbRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,13 @@ using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::core::repository {

void RocksDbRepository::printStats() {
std::optional<RepositoryMetricsSource::RocksDbStats> RocksDbRepository::getRocksDbStats() const {
fgerlits marked this conversation as resolved.
Show resolved Hide resolved
auto opendb = db_->open();
if (!opendb) {
return;
return RocksDbStats{};
}
std::string key_count;
opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);

std::string table_readers;
opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);

std::string all_memtables;
opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);

logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s",
key_count, table_readers, all_memtables);
return opendb->getStats();
}

bool RocksDbRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
Expand Down
2 changes: 1 addition & 1 deletion extensions/rocksdb-repos/RocksDbRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RocksDbRepository : public ThreadedRepository {

uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
void printStats();
std::optional<RocksDbStats> getRocksDbStats() const override;

protected:
bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
Expand Down
21 changes: 21 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,25 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
return std::nullopt;
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
std::string table_readers;
GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
try {
stats.table_readers_size = std::stoull(table_readers);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!");
}

std::string all_memtables;
GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
try {
stats.all_memory_tables_size = std::stoull(all_memtables);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!");
}

return stats;
}

} // namespace org::apache::nifi::minifi::internal
4 changes: 4 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "rocksdb/db.h"
#include "rocksdb/utilities/checkpoint.h"
#include "WriteBatch.h"
#include "core/RepositoryMetricsSource.h"
#include "core/logging/LoggerConfiguration.h"

namespace org::apache::nifi::minifi::internal {

Expand Down Expand Up @@ -73,6 +75,7 @@ class OpenRocksDb {
rocksdb::DB* get();

std::optional<uint64_t> getApproximateSizes() const;
minifi::core::RepositoryMetricsSource::RocksDbStats getStats();

private:
void handleResult(const rocksdb::Status& result);
Expand All @@ -81,6 +84,7 @@ class OpenRocksDb {
gsl::not_null<RocksDbInstance*> db_;
gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<OpenRocksDb>::getLogger()};
};

} // namespace org::apache::nifi::minifi::internal
10 changes: 10 additions & 0 deletions libminifi/include/core/RepositoryMetricsSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
#pragma once

#include <string>
#include <optional>

namespace org::apache::nifi::minifi::core {

class RepositoryMetricsSource {
public:
struct RocksDbStats {
uint64_t table_readers_size{};
uint64_t all_memory_tables_size{};
};

virtual ~RepositoryMetricsSource() = default;
virtual uint64_t getRepositorySize() const = 0;
virtual uint64_t getRepositoryEntryCount() const = 0;
Expand All @@ -40,6 +46,10 @@ class RepositoryMetricsSource {
virtual bool isRunning() const {
return true;
}

virtual std::optional<RocksDbStats> getRocksDbStats() const {
return std::nullopt;
}
};

} // namespace org::apache::nifi::minifi::core
Loading