From f40d946ddf5afc910d2c79f4fa9489a27670eb3d Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 22 Mar 2023 12:43:04 +0100 Subject: [PATCH 01/10] MINIFICPP-2082 Move RocksDB stats to RepositoryMetrics --- .../cluster/checkers/PrometheusChecker.py | 3 +- .../DatabaseContentRepository.cpp | 18 +++++++++++ .../rocksdb-repos/DatabaseContentRepository.h | 1 + .../rocksdb-repos/FlowFileRepository.cpp | 6 ---- .../rocksdb-repos/ProvenanceRepository.cpp | 13 -------- .../rocksdb-repos/ProvenanceRepository.h | 2 +- .../rocksdb-repos/RocksDbRepository.cpp | 12 +++---- extensions/rocksdb-repos/RocksDbRepository.h | 2 +- .../include/core/RepositoryMetricsSource.h | 9 ++++++ .../core/state/nodes/RepositoryMetrics.h | 21 ++++++++++++ libminifi/test/rocksdb-tests/RepoTests.cpp | 16 ++++++++++ libminifi/test/unit/MetricsTests.cpp | 32 ++++++++++++++++--- libminifi/test/unit/ProvenanceTestHelper.h | 10 ++++++ 13 files changed, 113 insertions(+), 32 deletions(-) diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index bd7daff8b0..d6f4554933 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -49,7 +49,8 @@ def verify_metric_class(self, metric_class): def verify_repository_metrics(self): label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}, {'repository_name': 'content'}] return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size_bytes', 'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 'RepositoryMetrics', labels) for labels in label_list)) and \ - all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) + all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) and \ + all((self.verify_metrics_exist(['minifi_rocksdb_table_readers_size_bytes', 'minifi_rocksdb_all_memory_tables_size_bytes'], 'RepositoryMetrics', labels) for labels in label_list[1:3])) def verify_queue_metrics(self): return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics') diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 56c0d2c83c..8e3da5ccaf 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -286,6 +286,24 @@ uint64_t DatabaseContentRepository::getRepositoryEntryCount() const { })).value_or(0); } +std::optional DatabaseContentRepository::getRocksDbStats() { + RocksDbStats stats; + auto opendb = db_->open(); + if (!opendb) { + return stats; + } + + std::string table_readers; + opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); + stats.table_readers_size = std::stoull(table_readers); + + std::string all_memtables; + opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); + stats.all_memory_tables_size = std::stoull(all_memtables); + + return stats; +} + REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository")); } // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index e084a65159..fa2c986246 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -76,6 +76,7 @@ class DatabaseContentRepository : public core::ContentRepository { uint64_t getRepositorySize() const override; uint64_t getRepositoryEntryCount() const override; + std::optional getRocksDbStats() override; protected: bool removeKey(const std::string& content_path) override; diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp index b38a7fdcec..7144dbc635 100644 --- a/extensions/rocksdb-repos/FlowFileRepository.cpp +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -112,15 +112,9 @@ void FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal } void FlowFileRepository::run() { - auto last = std::chrono::steady_clock::now(); while (isRunning()) { std::this_thread::sleep_for(purge_period_); flush(); - auto now = std::chrono::steady_clock::now(); - if ((now-last) > std::chrono::seconds(30)) { - printStats(); - last = now; - } } flush(); } diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp index 15073835e9..d46b3d048f 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.cpp +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -23,19 +23,6 @@ namespace org::apache::nifi::minifi::provenance { -void ProvenanceRepository::run() { - size_t count = 0; - while (isRunning()) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - count++; - // Hack, to be removed in scope of https://issues.apache.org/jira/browse/MINIFICPP-1145 - count = count % 30; - if (count == 0) { - printStats(); - } - } -} - bool ProvenanceRepository::initialize(const std::shared_ptr &config) { std::string value; if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) { diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h index ca0b6a8ad4..58fc7fa646 100644 --- a/extensions/rocksdb-repos/ProvenanceRepository.h +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -83,7 +83,7 @@ class ProvenanceRepository : public core::repository::RocksDbRepository { private: // Run function for the thread - void run() override; + void run() override {}; }; } // namespace org::apache::nifi::minifi::provenance diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index 02a7353958..5fca5ddf2a 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -20,22 +20,22 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::repository { -void RocksDbRepository::printStats() { +std::optional RocksDbRepository::getRocksDbStats() { + RocksDbStats stats; auto opendb = db_->open(); if (!opendb) { - return; + return stats; } - std::string key_count; - opendb->GetProperty("rocksdb.estimate-num-keys", &key_count); std::string table_readers; opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); + stats.table_readers_size = std::stoull(table_readers); std::string all_memtables; opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); + stats.all_memory_tables_size = std::stoull(all_memtables); - logger_->log_info("Repository stats: key count: %s, table readers size: %s, all memory tables size: %s", - key_count, table_readers, all_memtables); + return stats; } bool RocksDbRepository::ExecuteWithRetry(const std::function& operation) { diff --git a/extensions/rocksdb-repos/RocksDbRepository.h b/extensions/rocksdb-repos/RocksDbRepository.h index 6fd220d8d9..703b77adfc 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.h +++ b/extensions/rocksdb-repos/RocksDbRepository.h @@ -46,7 +46,7 @@ class RocksDbRepository : public ThreadedRepository { uint64_t getRepositorySize() const override; uint64_t getRepositoryEntryCount() const override; - void printStats(); + std::optional getRocksDbStats() override; protected: bool ExecuteWithRetry(const std::function& operation); diff --git a/libminifi/include/core/RepositoryMetricsSource.h b/libminifi/include/core/RepositoryMetricsSource.h index cfaeb4ae69..e46fe27f39 100644 --- a/libminifi/include/core/RepositoryMetricsSource.h +++ b/libminifi/include/core/RepositoryMetricsSource.h @@ -24,6 +24,11 @@ namespace org::apache::nifi::minifi::core { class RepositoryMetricsSource { public: + struct RocksDbStats { + uint64_t table_readers_size{}; + uint64_t all_memory_tables_size{}; + }; + virtual ~RepositoryMetricsSource() = default; virtual uint64_t getRepositorySize() const = 0; virtual uint64_t getRepositoryEntryCount() const = 0; @@ -40,6 +45,10 @@ class RepositoryMetricsSource { virtual bool isRunning() const { return true; } + + virtual std::optional getRocksDbStats() { + return std::nullopt; + } }; } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h index bc8dada721..fa95c48737 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetrics.h +++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h @@ -92,6 +92,20 @@ class RepositoryMetrics : public ResponseNode { parent.children.push_back(max_repo_size); parent.children.push_back(repo_entry_count); + auto rocksdb_stats = repo->getRocksDbStats(); + if (rocksdb_stats) { + SerializedResponseNode rocksdb_table_readers_size; + rocksdb_table_readers_size.name = "rocksDbTableReadersSize"; + rocksdb_table_readers_size.value = rocksdb_stats->table_readers_size; + + SerializedResponseNode rocksdb_all_memory_tables_size; + rocksdb_all_memory_tables_size.name = "rocksDbAllMemoryTablesSize"; + rocksdb_all_memory_tables_size.value = rocksdb_stats->all_memory_tables_size; + + parent.children.push_back(rocksdb_table_readers_size); + parent.children.push_back(rocksdb_all_memory_tables_size); + } + serialized.push_back(parent); } return serialized; @@ -105,6 +119,13 @@ class RepositoryMetrics : public ResponseNode { metrics.push_back({"repository_size_bytes", static_cast(repo->getRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"max_repository_size_bytes", static_cast(repo->getMaxRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"repository_entry_count", static_cast(repo->getRepositoryEntryCount()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + auto rocksdb_stats = repo->getRocksDbStats(); + if (rocksdb_stats) { + metrics.push_back({"rocksdb_table_readers_size_bytes", static_cast(rocksdb_stats->table_readers_size), + {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), + {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); + } } return metrics; } diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index 4a187cca60..6a5558831c 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "core/Core.h" #include "core/repository/AtomicRepoEntries.h" @@ -530,12 +531,15 @@ TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepo std::shared_ptr repository; auto expected_is_full = false; uint64_t expected_max_repo_size = 0; + bool expected_rocksdb_stats = false; SECTION("FlowFileRepository") { repository = std::make_shared("ff", dir.string(), 0ms, 0, 1ms); + expected_rocksdb_stats = true; } SECTION("ProvenanceRepository") { repository = std::make_shared("ff", dir.string(), 0ms, 0, 1ms); + expected_rocksdb_stats = true; } SECTION("VolatileFlowFileRepository") { @@ -584,6 +588,11 @@ TEST_CASE("Test getting flow file repository size properties", "[TestGettingRepo REQUIRE(expected_is_full == repository->isFull()); REQUIRE(expected_max_repo_size == repository->getMaxRepositorySize()); REQUIRE(2 == repository->getRepositoryEntryCount()); + auto rocksdb_stats = repository->getRocksDbStats(); + REQUIRE(expected_rocksdb_stats == (rocksdb_stats != std::nullopt)); + if (rocksdb_stats) { + REQUIRE(rocksdb_stats->all_memory_tables_size > 0); + } } TEST_CASE("Test getting noop repository size properties", "[TestGettingRepositorySize]") { @@ -626,6 +635,7 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi std::shared_ptr content_repo; auto expected_is_full = false; uint64_t expected_max_repo_size = 0; + bool expected_rocksdb_stats = false; SECTION("FileSystemRepository") { content_repo = std::make_shared(); } @@ -638,6 +648,7 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi SECTION("DatabaseContentRepository") { content_repo = std::make_shared(); + expected_rocksdb_stats = true; } content_repo->initialize(configuration); @@ -672,6 +683,11 @@ TEST_CASE("Test getting content repository size properties", "[TestGettingReposi REQUIRE(expected_is_full == content_repo->isFull()); REQUIRE(expected_max_repo_size == content_repo->getMaxRepositorySize()); REQUIRE(1 == content_repo->getRepositoryEntryCount()); + auto rocksdb_stats = content_repo->getRocksDbStats(); + REQUIRE(expected_rocksdb_stats == (rocksdb_stats != std::nullopt)); + if (rocksdb_stats) { + REQUIRE(rocksdb_stats->all_memory_tables_size > 0); + } } TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") { diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index af6d9e963d..5be43fb4fd 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -90,7 +90,19 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { REQUIRE("RepositoryMetrics" == metrics.getName()); - auto repo = std::make_shared(); + std::shared_ptr repo; + size_t expected_metric_count{}; + + SECTION("Non-RocksDB repository") { + repo = std::make_shared(); + expected_metric_count = 5; + } + + SECTION("RocksDB repository") { + repo = std::make_shared(); + expected_metric_count = 7; + } + metrics.addRepository(repo); { @@ -99,13 +111,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); - REQUIRE(5 == resp.children.size()); + REQUIRE(expected_metric_count == resp.children.size()); checkSerializedValue(resp.children, "running", "false"); checkSerializedValue(resp.children, "full", "false"); checkSerializedValue(resp.children, "size", "0"); checkSerializedValue(resp.children, "maxSize", "0"); checkSerializedValue(resp.children, "entryCount", "0"); + if (expected_metric_count > 5) { + checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); + checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + } } repo->start(); @@ -115,13 +131,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); - REQUIRE(5 == resp.children.size()); + REQUIRE(expected_metric_count == resp.children.size()); checkSerializedValue(resp.children, "running", "true"); checkSerializedValue(resp.children, "full", "false"); checkSerializedValue(resp.children, "size", "0"); checkSerializedValue(resp.children, "maxSize", "0"); checkSerializedValue(resp.children, "entryCount", "0"); + if (expected_metric_count > 5) { + checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); + checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + } } repo->stop(); @@ -132,13 +152,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { minifi::state::response::SerializedResponseNode resp = metrics.serialize().at(0); REQUIRE("repo_name" == resp.name); - REQUIRE(5 == resp.children.size()); + REQUIRE(expected_metric_count == resp.children.size()); checkSerializedValue(resp.children, "running", "false"); checkSerializedValue(resp.children, "full", "false"); checkSerializedValue(resp.children, "size", "0"); checkSerializedValue(resp.children, "maxSize", "0"); checkSerializedValue(resp.children, "entryCount", "0"); + if (expected_metric_count > 5) { + checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); + checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + } } } diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 3f5a16c727..1d7c704312 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -176,6 +176,16 @@ class TestThreadedRepository : public TestRepositoryBase getRocksDbStats() override { + return RocksDbStats { + .table_readers_size = 100, + .all_memory_tables_size = 200 + }; + } +}; + class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepository { public: TestFlowRepository() From 4f3b4c329e9cc9e42ae40b84027f8e7f05a2229e Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 22 Mar 2023 16:18:21 +0100 Subject: [PATCH 02/10] Extract RepositoryMetricsSourceStore --- .../include/core/RepositoryMetricsSource.h | 1 + .../core/state/nodes/RepositoryMetrics.h | 82 +++------------ .../nodes/RepositoryMetricsSourceStore.h | 43 ++++++++ .../nodes/RepositoryMetricsSourceStore.cpp | 99 +++++++++++++++++++ 4 files changed, 154 insertions(+), 71 deletions(-) create mode 100644 libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h create mode 100644 libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp diff --git a/libminifi/include/core/RepositoryMetricsSource.h b/libminifi/include/core/RepositoryMetricsSource.h index e46fe27f39..ed35f2be6c 100644 --- a/libminifi/include/core/RepositoryMetricsSource.h +++ b/libminifi/include/core/RepositoryMetricsSource.h @@ -19,6 +19,7 @@ #pragma once #include +#include namespace org::apache::nifi::minifi::core { diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h b/libminifi/include/core/state/nodes/RepositoryMetrics.h index fa95c48737..788beb0062 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetrics.h +++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h @@ -26,7 +26,7 @@ #include "../nodes/MetricsBase.h" #include "Connection.h" -#include "core/RepositoryMetricsSource.h" +#include "RepositoryMetricsSourceStore.h" namespace org::apache::nifi::minifi::state::response { @@ -38,15 +38,18 @@ namespace org::apache::nifi::minifi::state::response { class RepositoryMetrics : public ResponseNode { public: RepositoryMetrics(std::string name, const utils::Identifier &uuid) - : ResponseNode(std::move(name), uuid) { + : ResponseNode(std::move(name), uuid), + repository_metrics_source_store_(getName()) { } explicit RepositoryMetrics(std::string name) - : ResponseNode(std::move(name)) { + : ResponseNode(std::move(name)), + repository_metrics_source_store_(getName()) { } RepositoryMetrics() - : ResponseNode("RepositoryMetrics") { + : ResponseNode("RepositoryMetrics"), + repository_metrics_source_store_(getName()) { } MINIFIAPI static constexpr const char* Description = "Metric node that defines repository metric information"; @@ -56,82 +59,19 @@ class RepositoryMetrics : public ResponseNode { } void addRepository(const std::shared_ptr &repo) { - if (nullptr != repo) { - repositories_.push_back(repo); - } + return repository_metrics_source_store_.addRepository(repo); } std::vector serialize() override { - std::vector serialized; - for (const auto& repo : repositories_) { - SerializedResponseNode parent; - parent.name = repo->getRepositoryName(); - SerializedResponseNode is_running; - is_running.name = "running"; - is_running.value = repo->isRunning(); - - SerializedResponseNode is_full; - is_full.name = "full"; - is_full.value = repo->isFull(); - - SerializedResponseNode repo_size; - repo_size.name = "size"; - repo_size.value = std::to_string(repo->getRepositorySize()); - - SerializedResponseNode max_repo_size; - max_repo_size.name = "maxSize"; - max_repo_size.value = std::to_string(repo->getMaxRepositorySize()); - - SerializedResponseNode repo_entry_count; - repo_entry_count.name = "entryCount"; - repo_entry_count.value = repo->getRepositoryEntryCount(); - - parent.children.push_back(is_running); - parent.children.push_back(is_full); - parent.children.push_back(repo_size); - parent.children.push_back(max_repo_size); - parent.children.push_back(repo_entry_count); - - auto rocksdb_stats = repo->getRocksDbStats(); - if (rocksdb_stats) { - SerializedResponseNode rocksdb_table_readers_size; - rocksdb_table_readers_size.name = "rocksDbTableReadersSize"; - rocksdb_table_readers_size.value = rocksdb_stats->table_readers_size; - - SerializedResponseNode rocksdb_all_memory_tables_size; - rocksdb_all_memory_tables_size.name = "rocksDbAllMemoryTablesSize"; - rocksdb_all_memory_tables_size.value = rocksdb_stats->all_memory_tables_size; - - parent.children.push_back(rocksdb_table_readers_size); - parent.children.push_back(rocksdb_all_memory_tables_size); - } - - serialized.push_back(parent); - } - return serialized; + return repository_metrics_source_store_.serialize(); } std::vector calculateMetrics() override { - std::vector metrics; - for (const auto& repo : repositories_) { - metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"repository_size_bytes", static_cast(repo->getRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"max_repository_size_bytes", static_cast(repo->getMaxRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"repository_entry_count", static_cast(repo->getRepositoryEntryCount()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - auto rocksdb_stats = repo->getRocksDbStats(); - if (rocksdb_stats) { - metrics.push_back({"rocksdb_table_readers_size_bytes", static_cast(rocksdb_stats->table_readers_size), - {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), - {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - } - } - return metrics; + return repository_metrics_source_store_.calculateMetrics(); } protected: - std::vector> repositories_; + RepositoryMetricsSourceStore repository_metrics_source_store_; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h new file mode 100644 index 0000000000..8ef101f967 --- /dev/null +++ b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h @@ -0,0 +1,43 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "core/RepositoryMetricsSource.h" +#include "core/state/Value.h" +#include "core/state/PublishedMetricProvider.h" + +namespace org::apache::nifi::minifi::state::response { + +class RepositoryMetricsSourceStore { + public: + RepositoryMetricsSourceStore(std::string name); + void addRepository(const std::shared_ptr &repo); + std::vector serialize(); + std::vector calculateMetrics(); + + private: + std::string name_; + std::vector> repositories_; +}; + +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp new file mode 100644 index 0000000000..f59d277c7e --- /dev/null +++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp @@ -0,0 +1,99 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/state/nodes/RepositoryMetricsSourceStore.h" + +namespace org::apache::nifi::minifi::state::response { + +RepositoryMetricsSourceStore::RepositoryMetricsSourceStore(std::string name) : name_(std::move(name)) {} + +void RepositoryMetricsSourceStore::addRepository(const std::shared_ptr &repo) { + if (nullptr != repo) { + repositories_.push_back(repo); + } +} + +std::vector RepositoryMetricsSourceStore::serialize() { + std::vector serialized; + for (const auto& repo : repositories_) { + SerializedResponseNode parent; + parent.name = repo->getRepositoryName(); + SerializedResponseNode is_running; + is_running.name = "running"; + is_running.value = repo->isRunning(); + + SerializedResponseNode is_full; + is_full.name = "full"; + is_full.value = repo->isFull(); + + SerializedResponseNode repo_size; + repo_size.name = "size"; + repo_size.value = std::to_string(repo->getRepositorySize()); + + SerializedResponseNode max_repo_size; + max_repo_size.name = "maxSize"; + max_repo_size.value = std::to_string(repo->getMaxRepositorySize()); + + SerializedResponseNode repo_entry_count; + repo_entry_count.name = "entryCount"; + repo_entry_count.value = repo->getRepositoryEntryCount(); + + parent.children.push_back(is_running); + parent.children.push_back(is_full); + parent.children.push_back(repo_size); + parent.children.push_back(max_repo_size); + parent.children.push_back(repo_entry_count); + + auto rocksdb_stats = repo->getRocksDbStats(); + if (rocksdb_stats) { + SerializedResponseNode rocksdb_table_readers_size; + rocksdb_table_readers_size.name = "rocksDbTableReadersSize"; + rocksdb_table_readers_size.value = rocksdb_stats->table_readers_size; + + SerializedResponseNode rocksdb_all_memory_tables_size; + rocksdb_all_memory_tables_size.name = "rocksDbAllMemoryTablesSize"; + rocksdb_all_memory_tables_size.value = rocksdb_stats->all_memory_tables_size; + + parent.children.push_back(rocksdb_table_readers_size); + parent.children.push_back(rocksdb_all_memory_tables_size); + } + + serialized.push_back(parent); + } + return serialized; +} + +std::vector RepositoryMetricsSourceStore::calculateMetrics() { + std::vector metrics; + for (const auto& repo : repositories_) { + metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_size_bytes", static_cast(repo->getRepositorySize()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"max_repository_size_bytes", static_cast(repo->getMaxRepositorySize()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"repository_entry_count", static_cast(repo->getRepositoryEntryCount()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + auto rocksdb_stats = repo->getRocksDbStats(); + if (rocksdb_stats) { + metrics.push_back({"rocksdb_table_readers_size_bytes", static_cast(rocksdb_stats->table_readers_size), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + } + } + return metrics; +} + +} // namespace org::apache::nifi::minifi::state::response From 5800b262ff255161d32f952afc8b84f97f41dcd5 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 22 Mar 2023 17:09:44 +0100 Subject: [PATCH 03/10] Add rocksdb metrics to AgentStatus --- .../cluster/checkers/PrometheusChecker.py | 6 +- .../DatabaseContentRepository.cpp | 2 +- .../rocksdb-repos/DatabaseContentRepository.h | 2 +- .../rocksdb-repos/RocksDbRepository.cpp | 2 +- extensions/rocksdb-repos/RocksDbRepository.h | 2 +- .../include/core/RepositoryMetricsSource.h | 2 +- .../core/state/nodes/AgentInformation.h | 73 +++++-------------- .../nodes/RepositoryMetricsSourceStore.h | 5 +- .../nodes/RepositoryMetricsSourceStore.cpp | 12 ++- libminifi/test/unit/ProvenanceTestHelper.h | 2 +- 10 files changed, 41 insertions(+), 67 deletions(-) diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index d6f4554933..2c1f1140c9 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -48,6 +48,7 @@ def verify_metric_class(self, metric_class): def verify_repository_metrics(self): label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}, {'repository_name': 'content'}] + # Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there return all((self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size_bytes', 'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'], 'RepositoryMetrics', labels) for labels in label_list)) and \ all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'RepositoryMetrics', labels) for labels in label_list[1:3])) and \ all((self.verify_metrics_exist(['minifi_rocksdb_table_readers_size_bytes', 'minifi_rocksdb_all_memory_tables_size_bytes'], 'RepositoryMetrics', labels) for labels in label_list[1:3])) @@ -75,12 +76,15 @@ def verify_device_info_node_metrics(self): def verify_agent_status_metrics(self): label_list = [{'repository_name': 'flowfile'}, {'repository_name': 'content'}] + # Only flowfile and content repositories are using rocksdb by default, so rocksdb specific metrics are only present there for labels in label_list: if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels) and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels) and self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus', labels) and self.verify_metric_larger_than_zero('minifi_repository_size_bytes', 'AgentStatus', labels) - and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels)): + and self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus', labels) + and self.verify_metric_exists('minifi_rocksdb_table_readers_size_bytes', 'AgentStatus', labels) + and self.verify_metric_exists('minifi_rocksdb_all_memory_tables_size_bytes', 'AgentStatus', labels)): return False # provenance repository is NoOpRepository by default which has zero size diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 8e3da5ccaf..3e5f216c75 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -286,7 +286,7 @@ uint64_t DatabaseContentRepository::getRepositoryEntryCount() const { })).value_or(0); } -std::optional DatabaseContentRepository::getRocksDbStats() { +std::optional DatabaseContentRepository::getRocksDbStats() const { RocksDbStats stats; auto opendb = db_->open(); if (!opendb) { diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h index fa2c986246..6982427dfa 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.h +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -76,7 +76,7 @@ class DatabaseContentRepository : public core::ContentRepository { uint64_t getRepositorySize() const override; uint64_t getRepositoryEntryCount() const override; - std::optional getRocksDbStats() override; + std::optional getRocksDbStats() const override; protected: bool removeKey(const std::string& content_path) override; diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index 5fca5ddf2a..cc0a1d61f1 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -20,7 +20,7 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::repository { -std::optional RocksDbRepository::getRocksDbStats() { +std::optional RocksDbRepository::getRocksDbStats() const { RocksDbStats stats; auto opendb = db_->open(); if (!opendb) { diff --git a/extensions/rocksdb-repos/RocksDbRepository.h b/extensions/rocksdb-repos/RocksDbRepository.h index 703b77adfc..88ab000ce2 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.h +++ b/extensions/rocksdb-repos/RocksDbRepository.h @@ -46,7 +46,7 @@ class RocksDbRepository : public ThreadedRepository { uint64_t getRepositorySize() const override; uint64_t getRepositoryEntryCount() const override; - std::optional getRocksDbStats() override; + std::optional getRocksDbStats() const override; protected: bool ExecuteWithRetry(const std::function& operation); diff --git a/libminifi/include/core/RepositoryMetricsSource.h b/libminifi/include/core/RepositoryMetricsSource.h index ed35f2be6c..6c933b7eb8 100644 --- a/libminifi/include/core/RepositoryMetricsSource.h +++ b/libminifi/include/core/RepositoryMetricsSource.h @@ -47,7 +47,7 @@ class RepositoryMetricsSource { return true; } - virtual std::optional getRocksDbStats() { + virtual std::optional getRocksDbStats() const { return std::nullopt; } }; diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index 0d18c721eb..97b330d494 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -61,7 +61,7 @@ #include "core/AgentIdentificationProvider.h" #include "utils/Export.h" #include "SupportedOperations.h" -#include "core/RepositoryMetricsSource.h" +#include "RepositoryMetricsSourceStore.h" namespace org::apache::nifi::minifi::state::response { @@ -411,11 +411,18 @@ class Bundles : public DeviceInformation { class AgentStatus : public StateMonitorNode { public: AgentStatus(std::string name, const utils::Identifier& uuid) - : StateMonitorNode(std::move(name), uuid) { + : StateMonitorNode(std::move(name), uuid), + repository_metrics_source_store_(getName()) { } explicit AgentStatus(std::string name) - : StateMonitorNode(std::move(name)) { + : StateMonitorNode(std::move(name)), + repository_metrics_source_store_(getName()) { + } + + explicit AgentStatus(std::string name, std::string parent_metrics_name) + : StateMonitorNode(std::move(name)), + repository_metrics_source_store_(std::move(parent_metrics_name)) { } MINIFIAPI static constexpr const char* Description = "Metric node that defines current agent status including repository, component and resource usage information."; @@ -424,14 +431,12 @@ class AgentStatus : public StateMonitorNode { return "AgentStatus"; } - void setRepositories(const std::map> &repositories) { - repositories_ = repositories; + void setRepositories(const std::vector> &repositories) { + repository_metrics_source_store_.setRepositories(repositories); } void addRepository(const std::shared_ptr &repo) { - if (nullptr != repo) { - repositories_.insert(std::make_pair(repo->getRepositoryName(), repo)); - } + repository_metrics_source_store_.addRepository(repo); } std::vector serialize() override { @@ -453,14 +458,7 @@ class AgentStatus : public StateMonitorNode { } std::vector calculateMetrics() override { - std::vector metrics; - for (const auto& [_, repo] : repositories_) { - metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"repository_size_bytes", static_cast(repo->getRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"max_repository_size_bytes", static_cast(repo->getMaxRepositorySize()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - metrics.push_back({"repository_entry_count", static_cast(repo->getRepositoryEntryCount()), {{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}}); - } + auto metrics = repository_metrics_source_store_.calculateMetrics(); if (nullptr != monitor_) { auto uptime = monitor_->getUptime(); metrics.push_back({"uptime_milliseconds", static_cast(uptime), {{"metric_class", getName()}}}); @@ -487,41 +485,8 @@ class AgentStatus : public StateMonitorNode { protected: SerializedResponseNode serializeRepositories() const { SerializedResponseNode repositories; - repositories.name = "repositories"; - - for (const auto& repo : repositories_) { - SerializedResponseNode repo_node; - repo_node.collapsible = false; - repo_node.name = repo.first; - - SerializedResponseNode repo_size; - repo_size.name = "size"; - repo_size.value = repo.second->getRepositorySize(); - - SerializedResponseNode max_repo_size; - max_repo_size.name = "maxSize"; - max_repo_size.value = repo.second->getMaxRepositorySize(); - - SerializedResponseNode repo_entry_count; - repo_entry_count.name = "entryCount"; - repo_entry_count.value = repo.second->getRepositoryEntryCount(); - - SerializedResponseNode is_running; - is_running.name = "running"; - is_running.value = repo.second->isRunning(); - - SerializedResponseNode is_full; - is_full.name = "full"; - is_full.value = repo.second->isFull(); - - repo_node.children.push_back(repo_size); - repo_node.children.push_back(max_repo_size); - repo_node.children.push_back(repo_entry_count); - repo_node.children.push_back(is_running); - repo_node.children.push_back(is_full); - repositories.children.push_back(repo_node); - } + repositories.children = repository_metrics_source_store_.serialize(); return repositories; } @@ -591,7 +556,7 @@ class AgentStatus : public StateMonitorNode { return resource_consumption; } - std::map> repositories_; + RepositoryMetricsSourceStore repository_metrics_source_store_; MINIFIAPI static utils::ProcessCpuUsageTracker cpu_load_tracker_; MINIFIAPI static std::mutex cpu_load_tracker_mutex_; @@ -623,7 +588,7 @@ class AgentMonitor { } void addRepository(const std::shared_ptr &repo) { if (nullptr != repo) { - repositories_.insert(std::make_pair(repo->getRepositoryName(), repo)); + repositories_.push_back(repo); } } @@ -632,7 +597,7 @@ class AgentMonitor { } protected: - std::map> repositories_; + std::vector> repositories_; state::StateMonitor* monitor_ = nullptr; }; @@ -762,7 +727,7 @@ class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIde std::vector getAgentStatus() const { std::vector serialized; - AgentStatus status("status"); + AgentStatus status("status", getName()); status.setRepositories(repositories_); status.setStateMonitor(monitor_); diff --git a/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h index 8ef101f967..af4827c1f8 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h +++ b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h @@ -31,9 +31,10 @@ namespace org::apache::nifi::minifi::state::response { class RepositoryMetricsSourceStore { public: RepositoryMetricsSourceStore(std::string name); + void setRepositories(const std::vector> &repositories); void addRepository(const std::shared_ptr &repo); - std::vector serialize(); - std::vector calculateMetrics(); + std::vector serialize() const; + std::vector calculateMetrics() const; private: std::string name_; diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp index f59d277c7e..2371b642b6 100644 --- a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp +++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp @@ -21,13 +21,17 @@ namespace org::apache::nifi::minifi::state::response { RepositoryMetricsSourceStore::RepositoryMetricsSourceStore(std::string name) : name_(std::move(name)) {} +void RepositoryMetricsSourceStore::setRepositories(const std::vector> &repositories) { + repositories_ = repositories; +} + void RepositoryMetricsSourceStore::addRepository(const std::shared_ptr &repo) { if (nullptr != repo) { repositories_.push_back(repo); } } -std::vector RepositoryMetricsSourceStore::serialize() { +std::vector RepositoryMetricsSourceStore::serialize() const { std::vector serialized; for (const auto& repo : repositories_) { SerializedResponseNode parent; @@ -42,11 +46,11 @@ std::vector RepositoryMetricsSourceStore::serialize() { SerializedResponseNode repo_size; repo_size.name = "size"; - repo_size.value = std::to_string(repo->getRepositorySize()); + repo_size.value = repo->getRepositorySize(); SerializedResponseNode max_repo_size; max_repo_size.name = "maxSize"; - max_repo_size.value = std::to_string(repo->getMaxRepositorySize()); + max_repo_size.value = repo->getMaxRepositorySize(); SerializedResponseNode repo_entry_count; repo_entry_count.name = "entryCount"; @@ -77,7 +81,7 @@ std::vector RepositoryMetricsSourceStore::serialize() { return serialized; } -std::vector RepositoryMetricsSourceStore::calculateMetrics() { +std::vector RepositoryMetricsSourceStore::calculateMetrics() const { std::vector metrics; for (const auto& repo : repositories_) { metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 1d7c704312..bd23e49aff 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -178,7 +178,7 @@ class TestThreadedRepository : public TestRepositoryBase getRocksDbStats() override { + std::optional getRocksDbStats() const override { return RocksDbStats { .table_readers_size = 100, .all_memory_tables_size = 200 From a76164a7118177e260a8c0cd72f119e6e31beb2c Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 22 Mar 2023 17:37:15 +0100 Subject: [PATCH 04/10] Add documentation --- METRICS.md | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/METRICS.md b/METRICS.md index f384fe570d..ee73a00186 100644 --- a/METRICS.md +++ b/METRICS.md @@ -143,13 +143,15 @@ QueueMetrics is a system level metric that reports queue metrics for every conne RepositoryMetrics is a system level metric that reports metrics for the registered repositories (by default flowfile, content, and provenance repositories) -| Metric name | Labels | Description | -|---------------------------|-----------------|-------------------------------------------------| -| is_running | repository_name | Is the repository running (1 or 0) | -| is_full | repository_name | Is the repository full (1 or 0) | -| repository_size_bytes | repository_name | Current size of the repository | -| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | -| repository_entry_count | repository_name | Current number of entries in the repository | +| Metric name | Labels | Description | +|--------------------------------------|-----------------|------------------------------------------------------------------------------------------------------------------| +| is_running | repository_name | Is the repository running (1 or 0) | +| is_full | repository_name | Is the repository full (1 or 0) | +| repository_size_bytes | repository_name | Current size of the repository | +| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | +| repository_entry_count | repository_name | Current number of entries in the repository | +| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) | +| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) | | Label | Description | |--------------------------|---------------------------------------------------------------------------------------------------------------------------------------| @@ -188,17 +190,19 @@ FlowInformation is a system level metric that reports component and queue relate AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information. -| Metric name | Labels | Description | -|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------| -| is_running | repository_name | Is the repository running (1 or 0) | -| is_full | repository_name | Is the repository full (1 or 0) | -| repository_size_bytes | repository_name | Current size of the repository | -| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | -| repository_entry_count | repository_name | Current number of entries in the repository | -| uptime_milliseconds | - | Agent uptime in milliseconds | -| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | -| agent_memory_usage_bytes | - | Memory used by the agent process in bytes | -| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. | +| Metric name | Labels | Description | +|--------------------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------------| +| is_running | repository_name | Is the repository running (1 or 0) | +| is_full | repository_name | Is the repository full (1 or 0) | +| repository_size_bytes | repository_name | Current size of the repository | +| max_repository_size_bytes | repository_name | Maximum size of the repository (0 if unlimited) | +| repository_entry_count | repository_name | Current number of entries in the repository | +| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) | +| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) | +| uptime_milliseconds | - | Agent uptime in milliseconds | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| agent_memory_usage_bytes | - | Memory used by the agent process in bytes | +| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. | | Label | Description | |-----------------|----------------------------------------------------------| From c0f963b0502f2e2681ec9da7797bd0453b42922f Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 23 Mar 2023 09:20:28 +0100 Subject: [PATCH 05/10] Fix linter --- .../include/core/state/nodes/RepositoryMetricsSourceStore.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h index af4827c1f8..b4fcc39f9e 100644 --- a/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h +++ b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h @@ -30,7 +30,7 @@ namespace org::apache::nifi::minifi::state::response { class RepositoryMetricsSourceStore { public: - RepositoryMetricsSourceStore(std::string name); + explicit RepositoryMetricsSourceStore(std::string name); void setRepositories(const std::vector> &repositories); void addRepository(const std::shared_ptr &repo); std::vector serialize() const; From f87ddc1881b1c24030d52db335e2741ca5ab818a Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 29 Mar 2023 17:07:28 +0200 Subject: [PATCH 06/10] Handle exceptions --- .../rocksdb-repos/DatabaseContentRepository.cpp | 12 ++++++++++-- extensions/rocksdb-repos/RocksDbRepository.cpp | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 3e5f216c75..61860ae159 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -295,11 +295,19 @@ std::optional DatabaseContentRepository:: std::string table_readers; opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - stats.table_readers_size = std::stoull(table_readers); + try { + stats.table_readers_size = std::stoull(table_readers); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + } std::string all_memtables; opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - stats.all_memory_tables_size = std::stoull(all_memtables); + try { + stats.all_memory_tables_size = std::stoull(all_memtables); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + } return stats; } diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index cc0a1d61f1..d21e69988a 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -29,11 +29,19 @@ std::optional RocksDbRepository::getRocks std::string table_readers; opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - stats.table_readers_size = std::stoull(table_readers); + try { + stats.table_readers_size = std::stoull(table_readers); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + } std::string all_memtables; opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - stats.all_memory_tables_size = std::stoull(all_memtables); + try { + stats.all_memory_tables_size = std::stoull(all_memtables); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + } return stats; } From 9138b5fd0cd0ff09a7d7c820441027a0197ab2c4 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 8 Jun 2023 11:09:09 +0200 Subject: [PATCH 07/10] Review update --- .../DatabaseContentRepository.cpp | 21 ++----------------- .../rocksdb-repos/RocksDbRepository.cpp | 21 ++----------------- .../rocksdb-repos/database/OpenRocksDb.cpp | 21 +++++++++++++++++++ .../rocksdb-repos/database/OpenRocksDb.h | 4 ++++ 4 files changed, 29 insertions(+), 38 deletions(-) diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 61860ae159..6994a482a0 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -287,29 +287,12 @@ uint64_t DatabaseContentRepository::getRepositoryEntryCount() const { } std::optional DatabaseContentRepository::getRocksDbStats() const { - RocksDbStats stats; auto opendb = db_->open(); if (!opendb) { - return stats; + return RocksDbStats{}; } - std::string table_readers; - opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - try { - stats.table_readers_size = std::stoull(table_readers); - } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); - } - - std::string all_memtables; - opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); - } - - return stats; + return opendb->getStats(); } REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository")); diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index d21e69988a..75044b159a 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -21,29 +21,12 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core::repository { std::optional RocksDbRepository::getRocksDbStats() const { - RocksDbStats stats; auto opendb = db_->open(); if (!opendb) { - return stats; + return RocksDbStats{}; } - std::string table_readers; - opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - try { - stats.table_readers_size = std::stoull(table_readers); - } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); - } - - std::string all_memtables; - opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); - } - - return stats; + return opendb->getStats(); } bool RocksDbRepository::ExecuteWithRetry(const std::function& operation) { diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index 703c66a399..b2d4f521f7 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -135,4 +135,25 @@ std::optional OpenRocksDb::getApproximateSizes() const { return std::nullopt; } +minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { + minifi::core::RepositoryMetricsSource::RocksDbStats stats; + std::string table_readers; + GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); + try { + stats.table_readers_size = std::stoull(table_readers); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + } + + std::string all_memtables; + GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); + try { + stats.all_memory_tables_size = std::stoull(all_memtables); + } catch (const std::exception&) { + logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + } + + return stats; +} + } // namespace org::apache::nifi::minifi::internal diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index b4d9643815..d65aad6895 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -27,6 +27,8 @@ #include "rocksdb/db.h" #include "rocksdb/utilities/checkpoint.h" #include "WriteBatch.h" +#include "core/RepositoryMetricsSource.h" +#include "core/logging/LoggerConfiguration.h" namespace org::apache::nifi::minifi::internal { @@ -73,6 +75,7 @@ class OpenRocksDb { rocksdb::DB* get(); std::optional getApproximateSizes() const; + minifi::core::RepositoryMetricsSource::RocksDbStats getStats(); private: void handleResult(const rocksdb::Status& result); @@ -81,6 +84,7 @@ class OpenRocksDb { gsl::not_null db_; gsl::not_null> impl_; gsl::not_null> column_; + std::shared_ptr logger_{minifi::core::logging::LoggerFactory::getLogger()}; }; } // namespace org::apache::nifi::minifi::internal From bcef0ade5da8ea94e90657b314eda0eec1d6522f Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Fri, 16 Jun 2023 11:11:13 +0200 Subject: [PATCH 08/10] Change error logs to warnings --- extensions/rocksdb-repos/database/OpenRocksDb.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index b2d4f521f7..c48c4b62c6 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -142,7 +142,7 @@ minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { try { stats.table_readers_size = std::stoull(table_readers); } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); } std::string all_memtables; @@ -150,7 +150,7 @@ minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { try { stats.all_memory_tables_size = std::stoull(all_memtables); } catch (const std::exception&) { - logger_->log_error("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); } return stats; From af2179cda102146bb8d0191d09254764373eb8a5 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Fri, 30 Jun 2023 08:24:00 +0200 Subject: [PATCH 09/10] Review update --- .../nodes/RepositoryMetricsSourceStore.cpp | 57 +++++-------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp index 2371b642b6..0d4789f8c5 100644 --- a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp +++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp @@ -34,46 +34,20 @@ void RepositoryMetricsSourceStore::addRepository(const std::shared_ptr RepositoryMetricsSourceStore::serialize() const { std::vector serialized; for (const auto& repo : repositories_) { - SerializedResponseNode parent; - parent.name = repo->getRepositoryName(); - SerializedResponseNode is_running; - is_running.name = "running"; - is_running.value = repo->isRunning(); - - SerializedResponseNode is_full; - is_full.name = "full"; - is_full.value = repo->isFull(); - - SerializedResponseNode repo_size; - repo_size.name = "size"; - repo_size.value = repo->getRepositorySize(); - - SerializedResponseNode max_repo_size; - max_repo_size.name = "maxSize"; - max_repo_size.value = repo->getMaxRepositorySize(); - - SerializedResponseNode repo_entry_count; - repo_entry_count.name = "entryCount"; - repo_entry_count.value = repo->getRepositoryEntryCount(); - - parent.children.push_back(is_running); - parent.children.push_back(is_full); - parent.children.push_back(repo_size); - parent.children.push_back(max_repo_size); - parent.children.push_back(repo_entry_count); - - auto rocksdb_stats = repo->getRocksDbStats(); - if (rocksdb_stats) { - SerializedResponseNode rocksdb_table_readers_size; - rocksdb_table_readers_size.name = "rocksDbTableReadersSize"; - rocksdb_table_readers_size.value = rocksdb_stats->table_readers_size; - - SerializedResponseNode rocksdb_all_memory_tables_size; - rocksdb_all_memory_tables_size.name = "rocksDbAllMemoryTablesSize"; - rocksdb_all_memory_tables_size.value = rocksdb_stats->all_memory_tables_size; - - parent.children.push_back(rocksdb_table_readers_size); - parent.children.push_back(rocksdb_all_memory_tables_size); + SerializedResponseNode parent = { + .name = repo->getRepositoryName(), + .children = { + {.name = "running", .value = repo->isRunning()}, + {.name = "full", .value = repo->isFull()}, + {.name = "size", .value = repo->getRepositorySize()}, + {.name = "maxSize", .value = repo->getMaxRepositorySize()}, + {.name = "entryCount", .value = repo->getRepositoryEntryCount()}, + } + }; + + if (auto rocksdb_stats = repo->getRocksDbStats()) { + parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size}); + parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size}); } serialized.push_back(parent); @@ -89,8 +63,7 @@ std::vector RepositoryMetricsSourceStore::calculateMetrics() co metrics.push_back({"repository_size_bytes", static_cast(repo->getRepositorySize()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"max_repository_size_bytes", static_cast(repo->getMaxRepositorySize()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"repository_entry_count", static_cast(repo->getRepositoryEntryCount()), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); - auto rocksdb_stats = repo->getRocksDbStats(); - if (rocksdb_stats) { + if (auto rocksdb_stats = repo->getRocksDbStats()) { metrics.push_back({"rocksdb_table_readers_size_bytes", static_cast(rocksdb_stats->table_readers_size), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), From 09d602981be237b3d562a6e2344d046378049fec Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 11 Jul 2023 14:56:55 +0200 Subject: [PATCH 10/10] Fix after rebase --- .../test/unit/LogMetricsPublisherTests.cpp | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index 35d4ba4f20..c7fafb867f 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -38,6 +38,8 @@ class LogPublisherTestFixture { response_node_loader_(std::make_shared(configuration_, std::vector>{provenance_repo_, flow_file_repo_}, nullptr)), publisher_("LogMetricsPublisher") { + provenance_repo_->initialize(configuration_); + flow_file_repo_->initialize(configuration_); } protected: @@ -89,14 +91,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" }, "flowfilerepository": { "running": "false", "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" } }, "deviceInfo": { @@ -119,14 +125,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" }, "flowfilerepository": { "running": "false", "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" } } } @@ -168,14 +178,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" }, "flowfilerepository": { "running": "false", "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" } } } @@ -199,14 +213,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" }, "flowfilerepository": { "running": "false", "full": "false", "size": "0", "maxSize": "0", - "entryCount": "0" + "entryCount": "0", + "rocksDbTableReadersSize": "0", + "rocksDbAllMemoryTablesSize": "2048" } } }