From ec295bdb4be46da2fa2550f79a61ba055f765a3b Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Sat, 6 Aug 2022 13:37:04 +0200 Subject: [PATCH 1/2] Add additional health and performance metrics --- CMakeLists.txt | 1 + caf | 2 +- include/broker/internal/channel.hh | 161 +++++++++++++++++-- include/broker/internal/core_actor.hh | 44 ++++++ include/broker/internal/master_actor.hh | 17 ++- include/broker/internal/metric_factory.hh | 178 ++++++++++++++++++++++ include/broker/internal/store_actor.hh | 2 + src/internal/core_actor.cc | 77 +++++++--- src/internal/master_actor.cc | 32 +++- src/internal/metric_factory.cc | 149 ++++++++++++++++++ 10 files changed, 620 insertions(+), 43 deletions(-) create mode 100644 include/broker/internal/metric_factory.hh create mode 100644 src/internal/metric_factory.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f8f1741..e41a849b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -332,6 +332,7 @@ set(BROKER_SRC src/internal/master_resolver.cc src/internal/metric_collector.cc src/internal/metric_exporter.cc + src/internal/metric_factory.cc src/internal/metric_scraper.cc src/internal/metric_view.cc src/internal/pending_connection.cc diff --git a/caf b/caf index 19a141fc..5498fe5e 160000 --- a/caf +++ b/caf @@ -1 +1 @@ -Subproject commit 19a141fc148c63edfb5fbcd18dad7a9b64411e04 +Subproject commit 5498fe5ee4ef3739862705077d9f809230d4d3c4 diff --git a/include/broker/internal/channel.hh b/include/broker/internal/channel.hh index 2ef00630..4fa36158 100644 --- a/include/broker/internal/channel.hh +++ b/include/broker/internal/channel.hh @@ -12,6 +12,7 @@ #include "broker/error.hh" #include "broker/internal/logger.hh" +#include "broker/internal/metric_factory.hh" #include "broker/lamport_timestamp.hh" #include "broker/none.hh" @@ -156,6 +157,56 @@ public: using path_list = std::vector; + /// Bundles metrics for the producer. + struct metrics_t { + /// Keeps track of how many output channels exist. + caf::telemetry::int_gauge* output_channels = nullptr; + + /// Keeps track of how many messages currently wait for an ACK. + caf::telemetry::int_gauge* unacknowledged = nullptr; + + /// Keeps track of how many messages were sent (acknowledged) in total. + caf::telemetry::int_counter* processed = nullptr; + + void init(caf::telemetry::metric_registry& reg, std::string_view name) { + metric_factory factory{reg}; + output_channels = factory.store.output_channels_instance(name); + unacknowledged = factory.store.unacknowledged_updates_instance(name); + processed = factory.store.processed_updates_instance(name); + } + + void init(caf::actor_system& sys, std::string_view name) { + init(sys.metrics(), name); + } + + bool initialized() const noexcept { + // Either all pointer are null or none. + return output_channels != nullptr; + } + + void inc_output_channels() { + if (output_channels) + output_channels->inc(); + } + + void dec_output_channels() { + if (output_channels) + output_channels->dec(); + } + + void inc_unacknowledged() { + if (unacknowledged) + unacknowledged->inc(); + } + + void shipped(int64_t num) { + if (unacknowledged) { + unacknowledged->dec(num); + processed->inc(num); + } + } + }; + // -- constructors, destructors, and assignment operators ------------------ explicit producer(Backend* backend) : backend_(backend) { @@ -167,6 +218,7 @@ public: void produce(Payload content) { if (paths_.empty()) return; + metrics_.inc_unacknowledged(); ++seq_; buf_.emplace_back(event{seq_, std::move(content)}); last_broadcast_ = tick_; @@ -177,6 +229,7 @@ public: if (find_path(hdl) != paths_.end()) return ec::consumer_exists; BROKER_DEBUG("add" << hdl << "to the channel"); + metrics_.inc_output_channels(); paths_.emplace_back(path{hdl, seq_, 0, tick_}); backend_->send(this, hdl, handshake{seq_, heartbeat_interval_}); return {}; @@ -214,8 +267,11 @@ public: } // Drop events from the buffer if possible. auto not_acked = [acked](const event& x) { return x.seq > acked; }; - buf_.erase(buf_.begin(), - std::find_if(buf_.begin(), buf_.end(), not_acked)); + auto new_begin = std::find_if(buf_.begin(), buf_.end(), not_acked); + if (auto n = std::distance(buf_.begin(), new_begin); n > 0) { + metrics_.shipped(n); + buf_.erase(buf_.begin(), new_begin); + } } void handle_nack(const Handle& hdl, @@ -272,6 +328,7 @@ public: for (auto i = paths_.begin(); i != paths_.end();) { if (tick_.value - i->last_seen.value >= timeout) { BROKER_DEBUG("remove" << i->hdl << "from channel: consumer timeout"); + metrics_.dec_output_channels(); backend_->drop(this, i->hdl, ec::connection_timeout); i = paths_.erase(i); ++erased_paths; @@ -289,8 +346,11 @@ public: if (i->acked < acked) acked = i->acked; auto not_acked = [acked](const event& x) { return x.seq > acked; }; - buf_.erase(buf_.begin(), - std::find_if(buf_.begin(), buf_.end(), not_acked)); + auto new_begin = std::find_if(buf_.begin(), buf_.end(), not_acked); + if (auto n = std::distance(buf_.begin(), new_begin); n > 0) { + metrics_.shipped(n); + buf_.erase(buf_.begin(), new_begin); + } } } @@ -304,6 +364,10 @@ public: return *backend_; } + metrics_t& metrics() noexcept { + return metrics_; + } + auto seq() const noexcept { return seq_; } @@ -374,6 +438,9 @@ public: /// Transmits messages to the consumers. Backend* backend_; + /// Caches pointers to the metric instances. + metrics_t metrics_; + /// Monotonically increasing counter (starting at 1) to establish ordering /// of messages on this channel. Since we start at 1, the first message we /// send is going to have a sequence number of *2*. This enables us to @@ -452,6 +519,50 @@ public: using buf_type = std::deque; + /// Bundles metrics for the consumer. + struct metrics_t { + /// Keeps track of how many output channels exist. + caf::telemetry::int_gauge* input_channels = nullptr; + + /// Keeps track of how many messages are currently buffered because they + /// arrived out-of-order. + caf::telemetry::int_gauge* out_of_order_updates = nullptr; + + void init(caf::telemetry::metric_registry& reg, std::string_view name) { + metric_factory factory{reg}; + input_channels = factory.store.input_channels_instance(name); + } + + void init(caf::actor_system& sys, std::string_view name) { + init(sys.metrics(), name); + } + + bool initialized() const noexcept { + // Either all pointer are null or none. + return input_channels != nullptr; + } + + void inc_input_channels() { + if (input_channels) + input_channels->inc(); + } + + void dec_input_channels() { + if (input_channels) + input_channels->dec(); + } + + void inc_out_of_order_updates() { + if (out_of_order_updates) + out_of_order_updates->inc(); + } + + void dec_out_of_order_updates(int64_t n = 1) { + if (out_of_order_updates) + out_of_order_updates->dec(n); + } + }; + // -- constructors, destructors, and assignment operators ------------------ explicit consumer(Backend* backend) : backend_(backend) { @@ -493,12 +604,17 @@ public: // message before that point. if (!buf_.empty()) { auto pred = [=](const optional_event& x) { return x.seq > offset; }; - auto i = std::find_if(buf_.begin(), buf_.end(), pred); - buf_.erase(buf_.begin(), i); + auto new_begin = std::find_if(buf_.begin(), buf_.end(), pred); + if (auto n = std::distance(buf_.begin(), new_begin); n > 0) { + metrics_.dec_out_of_order_updates(n); + buf_.erase(buf_.begin(), new_begin); + } } // Consume buffered messages if possible and send initial ACK. try_consume_buffer(); send_ack(); + // Update our metric. This counter can only oscillate between 0 and 1. + metrics_.inc_input_channels(); return true; } @@ -524,12 +640,15 @@ public: // Insert event into buf_: sort by the sequence number, drop duplicates. auto pred = [seq](const optional_event& x) { return x.seq >= seq; }; auto i = std::find_if(buf_.begin(), buf_.end(), pred); - if (i == buf_.end()) + if (i == buf_.end()) { buf_.emplace_back(seq, std::move(payload)); - else if (i->seq != seq) + metrics_.inc_out_of_order_updates(); + } else if (i->seq != seq) { + metrics_.inc_out_of_order_updates(); buf_.emplace(i, seq, std::move(payload)); - else if (!i->content) + } else if (!i->content) { i->content = std::move(payload); + } } } @@ -538,6 +657,7 @@ public: // Process immediately. if (auto err = backend_->consume_nil(this)) { backend_->close(this, std::move(err)); + reset(); return; } bump_seq(); @@ -546,10 +666,13 @@ public: // Insert event into buf_: sort by the sequence number, drop duplicates. auto pred = [seq](const optional_event& x) { return x.seq >= seq; }; auto i = std::find_if(buf_.begin(), buf_.end(), pred); - if (i == buf_.end()) + if (i == buf_.end()) { buf_.emplace_back(seq); - else if (i->seq != seq) + metrics_.inc_out_of_order_updates(); + } else if (i->seq != seq) { buf_.emplace(i, seq); + metrics_.inc_out_of_order_updates(); + } } } @@ -615,6 +738,10 @@ public: return *backend_; } + metrics_t& metrics() noexcept { + return metrics_; + } + const auto& producer() const { return producer_; } @@ -681,6 +808,9 @@ public: } void reset() { + if (initialized()) { + metrics_.dec_input_channels(); + } producer_ = Handle{}; next_seq_ = 0; last_seq_ = 0; @@ -712,12 +842,16 @@ public: if (auto err = backend_->consume_nil(this)) { buf_.erase(buf_.begin(), i); backend_->close(this, std::move(err)); + reset(); return; } } bump_seq(); } - buf_.erase(buf_.begin(), i); + if (auto n = std::distance(buf_.begin(), i); n > 0) { + buf_.erase(buf_.begin(), i); + metrics_.dec_out_of_order_updates(n); + } } void send_ack() { @@ -729,6 +863,9 @@ public: /// Handles incoming events. Backend* backend_; + /// Caches pointers to the metric instances. + metrics_t metrics_; + /// Stores the handle of the producer. Handle producer_; diff --git a/include/broker/internal/core_actor.hh b/include/broker/internal/core_actor.hh index 507117da..9087184c 100644 --- a/include/broker/internal/core_actor.hh +++ b/include/broker/internal/core_actor.hh @@ -9,8 +9,11 @@ #include #include #include +#include +#include #include +#include #include namespace broker::internal { @@ -53,6 +56,39 @@ public: /// Convenience alias for a map of @ref peer_state objects. using peer_state_map = std::unordered_map; + /// Bundles message-related metrics that have a label dimension for the type. + struct message_metrics_t { + /// Counts how many messages were processed since starting the core. + caf::telemetry::int_counter* processed = nullptr; + + /// Keeps track of how many messages are currently buffered at the core. + caf::telemetry::int_gauge* buffered = nullptr; + + void assign(caf::telemetry::int_counter* processed_instance, + caf::telemetry::int_gauge* buffered_instance) noexcept { + processed = processed_instance; + buffered = buffered_instance; + } + }; + + /// Bundles metrics for the core. + struct metrics_t { + explicit metrics_t(caf::actor_system& sys); + + /// Keeps track of how many native peers are currently connected. + caf::telemetry::int_gauge* native_connections = nullptr; + + /// Keeps track of how many WebSocket clients are currently connected. + caf::telemetry::int_gauge* web_socket_connections = nullptr; + + /// Stores the metrics for all message types. + std::array message_metric_sets; + + message_metrics_t& metrics_for(packed_message_type msg_type) { + return message_metric_sets[static_cast(msg_type)]; + } + }; + // -- constants -------------------------------------------------------------- static inline const char* name = "broker.core"; @@ -274,6 +310,9 @@ public: /// Enables manual time management by the user. endpoint::clock* clock; + /// Caches pointers to the Broker metrics. + metrics_t metrics; + /// Stores all master actors created by this endpoint. std::unordered_map masters; @@ -319,6 +358,11 @@ public: /// Returns whether `shutdown` was called. bool shutting_down(); + + /// Returns the metrics set for a given message type. + message_metrics_t& metrics_for(packed_message_type msg_type) { + return metrics.metrics_for(msg_type); + } }; using core_actor = caf::stateful_actor; diff --git a/include/broker/internal/master_actor.hh b/include/broker/internal/master_actor.hh index 3b3904b0..521e5acd 100644 --- a/include/broker/internal/master_actor.hh +++ b/include/broker/internal/master_actor.hh @@ -6,6 +6,7 @@ #include #include #include +#include #include "broker/data.hh" #include "broker/detail/abstract_backend.hh" @@ -22,15 +23,26 @@ class master_state : public store_actor_state { public: // -- member types ----------------------------------------------------------- + /// Base type. using super = store_actor_state; + /// Channel type for producing messages for a clone. using producer_type = channel_type::producer; + /// Channel type for consuming messages from a clone. using consumer_type = channel_type::consumer; - /// Owning smart pointer to a backend. + /// Owning smart pointer type to a backend. using backend_pointer = std::unique_ptr; + /// Bundles metrics for the master. + struct metrics_t { + metrics_t(caf::actor_system& sys, const std::string& name) noexcept; + + /// Keeps track of how many entries the store currently has. + caf::telemetry::int_gauge* entries = nullptr; + }; + template void broadcast(T&& cmd) { BROKER_TRACE(BROKER_ARG(cmd)); @@ -133,6 +145,9 @@ public: /// Maps senders to manager objects for incoming commands. std::unordered_map expirations; + /// Caches pointers to the metric instances. + metrics_t metrics; + /// Gives this actor a recognizable name in log files. static inline constexpr const char* name = "broker.master"; }; diff --git a/include/broker/internal/metric_factory.hh b/include/broker/internal/metric_factory.hh new file mode 100644 index 00000000..5f961090 --- /dev/null +++ b/include/broker/internal/metric_factory.hh @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace broker::internal { + +/// Provides a single access point for all Broker metric families and instances. +class metric_factory { +public: + // -- convenience type aliases ----------------------------------------------- + + template + using family_t = caf::telemetry::metric_family_impl; + + using dbl_gauge = caf::telemetry::dbl_gauge; + + using dbl_gauge_family = family_t; + + using int_gauge = caf::telemetry::int_gauge; + + using int_gauge_family = family_t; + + using dbl_counter = caf::telemetry::dbl_counter; + + using dbl_counter_family = family_t; + + using int_counter = caf::telemetry::int_counter; + + using int_counter_family = family_t; + + using dbl_histogram = caf::telemetry::dbl_histogram; + + using dbl_histogram_family = family_t; + + using int_histogram = caf::telemetry::int_histogram; + + using int_histogram_family = family_t; + + /// Bundles all Broker metrics for the core system. + class core_t { + public: + explicit core_t(caf::telemetry::metric_registry* reg) : reg_(reg) {} + + core_t(const core_t&) noexcept = default; + + core_t& operator=(const core_t&) noexcept = default; + + /// Keeps track of the active connections. + /// + /// Label dimensions: `type` ('native' or 'web-socket'). + int_gauge_family* connections_family(); + + struct connections_t { + int_gauge* native; + int_gauge* web_socket; + }; + + /// Returns all instances of `broker.connections`. + connections_t connections_instances(); + + /// Counts how many messages Broker has processed in total per message type. + /// + /// Label dimensions: `type` ('data', 'command', 'routing-update', 'ping', + /// or 'pong'). + int_counter_family* processed_messages_family(); + + struct processed_messages_t { + int_counter* data; + int_counter* command; + int_counter* routing_update; + int_counter* ping; + int_counter* pong; + }; + + /// Returns all instances of `broker.processed-messages`. + processed_messages_t processed_messages_instances(); + + /// Counts how many messages Broker has buffered in total per message type. + /// + /// Label dimensions: `type` ('data', 'command', 'routing-update', 'ping', + /// or 'pong'). + int_gauge_family* buffered_messages_family(); + + struct buffered_messages_t { + int_gauge* data; + int_gauge* command; + int_gauge* routing_update; + int_gauge* ping; + int_gauge* pong; + }; + + /// Returns all instances of `broker.buffered-messages`. + buffered_messages_t buffered_messages_instances(); + + private: + caf::telemetry::metric_registry* reg_; + }; + + /// Bundles all Broker metrics for the data stores. + class store_t { + public: + explicit store_t(caf::telemetry::metric_registry* reg) : reg_(reg) {} + + store_t(const store_t&) noexcept = default; + + store_t& operator=(const store_t&) noexcept = default; + + /// Counts how many input channels a data store currently has. + int_gauge_family* input_channels_family(); + + /// Returns an instance of `broker.store-input-channels` for the given + /// data store. + int_gauge* input_channels_instance(std::string_view name); + + /// Counts how many inputs are currently buffered because they arrived + /// out-of-order. + int_gauge_family* out_of_order_updates_family(); + + /// Returns an instance of `broker.store-out-of-order-updates` for the given + /// data store. + int_gauge* out_of_order_updates_instance(std::string_view name); + + /// Counts how many output channels a data store currently has. + int_gauge_family* output_channels_family(); + + /// Returns an instance of `broker.store-output-channels` for the given + /// data store. + int_gauge* output_channels_instance(std::string_view name); + + /// Counts how many entries a data store currently has. + int_gauge_family* entries_family(); + + /// Returns an instance of `broker.store-entries` for the given data store. + int_gauge* entries_instance(std::string_view name); + + /// Counts how many updates were processed in total. + int_counter_family* processed_updates_family(); + + /// Returns an instance of `broker.store-processed-updates` for the + /// given data store. + int_counter* processed_updates_instance(std::string_view name); + + /// Counts how many updates are currently unacknowledged. + int_gauge_family* unacknowledged_updates_family(); + + /// Returns an instance of `broker.store-unacknowledged-updates` for the + /// given data store. + int_gauge* unacknowledged_updates_instance(std::string_view name); + + private: + caf::telemetry::metric_registry* reg_; + }; + + // -- properties ------------------------------------------------------------- + + core_t core; + + store_t store; + + // --- constructors ---------------------------------------------------------- + + explicit metric_factory(caf::actor_system& sys) noexcept; + + explicit metric_factory(caf::telemetry::metric_registry& reg) noexcept + : core(®), store(®) { + // nop + } + + metric_factory(const metric_factory&) noexcept = default; + + metric_factory& operator=(const metric_factory&) noexcept = default; +}; + +} // namespace broker::internal diff --git a/include/broker/internal/store_actor.hh b/include/broker/internal/store_actor.hh index e683ea53..f6ed96f8 100644 --- a/include/broker/internal/store_actor.hh +++ b/include/broker/internal/store_actor.hh @@ -76,6 +76,7 @@ public: defaults::store::heartbeat_interval)); out.connection_timeout_factor(get_or(cfg, "broker.store.connection-timeout", defaults::store::connection_timeout)); + out.metrics().init(self->system(), store_name); } template @@ -93,6 +94,7 @@ public: in.heartbeat_interval(heartbeat_interval); in.connection_timeout_factor(connection_timeout); in.nack_timeout(nack_timeout); + in.metrics().init(self->system(), store_name); } template diff --git a/src/internal/core_actor.cc b/src/internal/core_actor.cc index 1a54cea7..f3ce58b9 100644 --- a/src/internal/core_actor.cc +++ b/src/internal/core_actor.cc @@ -35,6 +35,22 @@ namespace broker::internal { // --- constructors and destructors -------------------------------------------- +core_actor_state::metrics_t::metrics_t(caf::actor_system& sys) { + metric_factory factory{sys}; + // Initialize connection metrics. + auto [native, ws] = factory.core.connections_instances(); + native_connections = native; + web_socket_connections = ws; + // Initialize message metrics, indexes are according to packed_message_type. + auto proc = factory.core.processed_messages_instances(); + auto buf = factory.core.buffered_messages_instances(); + message_metric_sets[1].assign(proc.data, buf.data); + message_metric_sets[2].assign(proc.command, buf.command); + message_metric_sets[3].assign(proc.routing_update, buf.routing_update); + message_metric_sets[4].assign(proc.ping, buf.ping); + message_metric_sets[5].assign(proc.pong, buf.pong); +} + core_actor_state::core_actor_state(caf::event_based_actor* self, endpoint_id this_peer, filter_type initial_filter, @@ -44,8 +60,10 @@ core_actor_state::core_actor_state(caf::event_based_actor* self, : self(self), id(this_peer), filter(std::make_shared(std::move(initial_filter))), - clock(clock) { - // Check for extra configuration parameters. + clock(clock), + metrics(self->system()) { + // Read config and check for extra configuration parameters. + ttl = caf::get_or(self->config(), "broker.ttl", defaults::ttl); if (adaptation && adaptation->disable_forwarding) { BROKER_INFO("disable forwarding on this peer"); disable_forwarding = true; @@ -66,7 +84,6 @@ core_actor_state::core_actor_state(caf::event_based_actor* self, on_peer_unavailable, filter, peer_statuses)); } - ttl = caf::get_or(self->config(), "broker.ttl", defaults::ttl); } core_actor_state::~core_actor_state() { @@ -76,20 +93,6 @@ core_actor_state::~core_actor_state() { // -- initialization and tear down --------------------------------------------- caf::behavior core_actor_state::make_behavior() { - // Our metrics for keeping track of how many messages pass through this peer. - // Indexes into the array are values of packed_message_type, they start at 1. - auto proc_fam = self->system().metrics().counter_family( - "broker", "processed-elements", {"type"}, - "Number of processed stream elements."); - using counter_ptr = caf::telemetry::int_counter*; - std::array counters{{ - nullptr, - proc_fam->get_or_add({{"type", "data"}}), - proc_fam->get_or_add({{"type", "command"}}), - proc_fam->get_or_add({{"type", "routing-update"}}), - proc_fam->get_or_add({{"type", "ping"}}), - proc_fam->get_or_add({{"type", "pong"}}), - }}; // Create the mergers. auto init_merger = [this](auto& ptr) { ptr.emplace(self); // Allocate the object. @@ -111,10 +114,12 @@ caf::behavior core_actor_state::make_behavior() { // Process filter updates from our peers and add instrumentation for metrics. central_merge // ->as_observable() - .for_each([this, counters](const node_message& msg) { + .for_each([this](const node_message& msg) { auto sender = get_sender(msg); // Update metrics. - counters[static_cast(get_type(msg))]->inc(); + auto& metrics = metrics_for(get_type(msg)); + metrics.processed->inc(); + metrics.buffered->dec(); // We only care about incoming filter updates messages here. if (sender == id || get_type(msg) != packed_message_type::routing_update) return; @@ -134,7 +139,7 @@ caf::behavior core_actor_state::make_behavior() { // Respond to PING messages. central_merge // ->as_observable() - .for_each([this, counters](const node_message& msg) { + .for_each([this](const node_message& msg) { auto sender = get_sender(msg); if (sender == id || get_type(msg) != packed_message_type::ping) return; @@ -282,7 +287,13 @@ caf::behavior core_actor_state::make_behavior() { }, // -- interface for publishers --------------------------------------------- [this](data_consumer_res src) { - auto sub = data_inputs->add(self->make_observable().from_resource(src)); + auto in = self + ->make_observable() // + .from_resource(src) + .do_on_next([this](const data_message&) { + metrics_for(packed_message_type::data).buffered->inc(); + }); + auto sub = data_inputs->add(std::move(in)); subscriptions.emplace_back(std::move(sub)); }, // -- data store management ------------------------------------------------ @@ -758,6 +769,8 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, << BROKER_ARG(status)); return caf::make_error(ec::invalid_status, to_string(status)); } + // All sanity checks have passed, update our state. + metrics.native_connections->inc(); // Store the filter for is_subscribed_to. peer_filters[peer_id] = filter; // Hook into the central merge point for forwarding the data to the peer. @@ -800,9 +813,13 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, // Read messages from the peer. auto in = self->make_observable() .from_resource(in_res) + .do_on_next([this](const node_message& msg) { + metrics_for(get_type(msg)).buffered->inc(); + }) // If the peer closes this buffer, we assume a disconnect. .do_finally([this, peer_id, ts] { // BROKER_DEBUG("close input flow from" << peer_id); + metrics.native_connections->dec(); caf::error reason; handle_peer_close_event(peer_id, ts, reason); }) @@ -872,6 +889,8 @@ caf::error core_actor_state::init_new_client(const network_info& addr, return caf::make_error(caf::sec::invalid_argument, "cannot add client without valid input buffer"); } + // All sanity checks have passed, update our state. + metrics.web_socket_connections->inc(); // We cannot simply treat a client like we treat a local publisher or // subscriber, because events from the client must be visible locally. Hence, // we assign a UUID to each client and treat it almost like a peer. @@ -889,7 +908,7 @@ caf::error core_actor_state::init_new_client(const network_info& addr, detail::prefix_matcher f; return f(filter, get_topic(msg)); }) - // Deserialize payload and wrap it into an data message. + // Deserialize payload and wrap it into a data message. .flat_map_optional([this](const node_message& msg) { // TODO: repeats deserialization in the core! Ideally, this // would only happen exactly once per message. @@ -906,8 +925,10 @@ caf::error core_actor_state::init_new_client(const network_info& addr, .do_finally([this, client_id, addr, type] { BROKER_DEBUG("client" << addr << "disconnected"); client_removed(client_id, addr, type); + metrics.web_socket_connections->dec(); }) .map([this, client_id](const data_message& msg) { + metrics_for(packed_message_type::data).buffered->inc(); return make_node_message(client_id, endpoint_id::nil(), pack(msg)); }) @@ -981,7 +1002,10 @@ caf::result core_actor_state::attach_master(const std::string& name, return f(xs, item); }) .subscribe(prod1); - command_inputs->add(self->make_observable().from_resource(con2)); + command_inputs->add(self->make_observable().from_resource(con2).do_on_next( + [this](const command_message&) { + metrics_for(packed_message_type::command).buffered->inc(); + })); // Save the handle and monitor the new actor. masters.emplace(name, hdl); self->link_to(hdl); @@ -1020,7 +1044,10 @@ core_actor_state::attach_clone(const std::string& name, double resync_interval, return f(xs, item); }) .subscribe(prod1); - command_inputs->add(self->make_observable().from_resource(con2)); + command_inputs->add(self->make_observable().from_resource(con2).do_on_next( + [this](const command_message&) { + metrics_for(packed_message_type::command).buffered->inc(); + })); // Save the handle for later. clones.emplace(name, hdl); return hdl; @@ -1041,6 +1068,7 @@ void core_actor_state::shutdown_stores() { // -- dispatching of messages to peers regardless of subscriptions ------------ void core_actor_state::dispatch(endpoint_id receiver, packed_message msg) { + metrics_for(get_type(msg)).buffered->inc(); central_merge->append_to_buf(make_node_message(id, receiver, msg)); central_merge->try_push(); } @@ -1058,6 +1086,7 @@ void core_actor_state::broadcast_subscriptions() { auto packed = packed_message{packed_message_type::routing_update, ttl, topic{std::string{topic::reserved}}, std::vector{first, last}}; + metrics_for(packed_message_type::routing_update).buffered->inc(); for (auto& kvp : peers) central_merge->append_to_buf(node_message(id, kvp.first, packed)); central_merge->try_push(); diff --git a/src/internal/master_actor.cc b/src/internal/master_actor.cc index 1d125d59..22f0b499 100644 --- a/src/internal/master_actor.cc +++ b/src/internal/master_actor.cc @@ -18,6 +18,7 @@ #include "broker/detail/assert.hh" #include "broker/detail/die.hh" #include "broker/internal/master_actor.hh" +#include "broker/internal/metric_factory.hh" #include "broker/store.hh" #include "broker/time.hh" #include "broker/topic.hh" @@ -41,6 +42,14 @@ auto to_caf_res(expected&& x) { } // namespace +// -- metrics ------------------------------------------------------------------ + +master_state::metrics_t::metrics_t(caf::actor_system& sys, + const std::string& name) noexcept { + metric_factory factory{sys}; + entries = factory.store.entries_instance(name); +} + // -- initialization ----------------------------------------------------------- master_state::master_state( @@ -48,7 +57,7 @@ master_state::master_state( backend_pointer bp, caf::actor parent, endpoint::clock* ep_clock, caf::async::consumer_resource in_res, caf::async::producer_resource out_res) - : output(this) { + : output(this), metrics(ptr->system(), nm) { super::init(ptr, std::move(this_endpoint), ep_clock, std::move(nm), std::move(parent), std::move(in_res), std::move(out_res)); super::init(output); @@ -161,6 +170,7 @@ void master_state::tick() { expire_command cmd{std::move(key), id}; emit_expire_event(cmd); broadcast(std::move(cmd)); + metrics.entries->dec(); } i = expirations.erase(i); } else { @@ -196,10 +206,12 @@ void master_state::consume(put_command& x) { return; // TODO: propagate failure? to all clones? as status msg? } set_expire_time(x.key, x.expiry); - if (old_value) + if (old_value) { emit_update_event(x, *old_value); - else + } else { emit_insert_event(x); + metrics.entries->inc(); + } broadcast(std::move(x)); } @@ -230,6 +242,7 @@ void master_state::consume(put_unique_command& x) { } set_expire_time(x.key, x.expiry); emit_insert_event(x); + metrics.entries->inc(); // Broadcast a regular "put" command (clones don't have to do their own // existence check) followed by the (positive) result message. broadcast(put_command{std::move(x.key), std::move(x.value), x.expiry, @@ -240,11 +253,16 @@ void master_state::consume(put_unique_command& x) { void master_state::consume(erase_command& x) { BROKER_TRACE(BROKER_ARG(x)); BROKER_INFO("ERASE" << x.key); + if (!exists(x.key)) { + BROKER_DEBUG("failed to erase" << x.key << "-> no such key"); + return; + } if (auto res = backend->erase(x.key); !res) { BROKER_WARNING("failed to erase" << x.key << "->" << res.error()); return; // TODO: propagate failure? to all clones? as status msg? } emit_erase_event(x.key, x.publisher); + metrics.entries->dec(); broadcast(std::move(x)); } @@ -268,10 +286,12 @@ void master_state::consume(add_command& x) { // processing again. put_command cmd{std::move(x.key), std::move(*val), std::nullopt, std::move(x.publisher)}; - if (old_value) + if (old_value) { emit_update_event(cmd, *old_value); - else + } else { emit_insert_event(cmd); + metrics.entries->inc(); + } broadcast(std::move(cmd)); } } @@ -316,9 +336,11 @@ void master_state::consume(clear_command& x) { if (auto keys = get_if(*keys_res)) { for (auto& key : *keys) emit_erase_event(key, x.publisher); + metrics.entries->value(0); } else if (auto keys = get_if(*keys_res)) { for (auto& key : *keys) emit_erase_event(key, x.publisher); + metrics.entries->value(0); } else if (!is(*keys_res)) { BROKER_ERROR("backend->keys() returned an unexpected result type"); } diff --git a/src/internal/metric_factory.cc b/src/internal/metric_factory.cc new file mode 100644 index 00000000..97c6287f --- /dev/null +++ b/src/internal/metric_factory.cc @@ -0,0 +1,149 @@ +#include "broker/internal/metric_factory.hh" + +#include + +namespace broker::internal { + +// -- 'imports' to safe ourselves some typing ---------------------------------- + +using dbl_gauge = metric_factory::dbl_gauge; + +using dbl_gauge_family = metric_factory::dbl_gauge_family; + +using int_gauge = metric_factory::int_gauge; + +using int_gauge_family = metric_factory::int_gauge_family; + +using dbl_counter = metric_factory::dbl_counter; + +using dbl_counter_family = metric_factory::dbl_counter_family; + +using int_counter = metric_factory::int_counter; + +using int_counter_family = metric_factory::int_counter_family; + +using dbl_histogram = metric_factory::dbl_histogram; + +using dbl_histogram_family = metric_factory::dbl_histogram_family; + +using int_histogram = metric_factory::int_histogram; + +using int_histogram_family = metric_factory::int_histogram_family; + +// -- core metrics ------------------------------------------------------------- + +using core_t = metric_factory::core_t; + +int_gauge_family* core_t::connections_family() { + return reg_->gauge_family("broker", "connections", {"type"}, + "Number of active network connections."); +} + +core_t::connections_t core_t::connections_instances() { + auto fm = connections_family(); + return { + fm->get_or_add({{"type", "native"}}), + fm->get_or_add({{"type", "web-socket"}}), + }; +} + +int_counter_family* core_t::processed_messages_family() { + return reg_->counter_family("broker", "processed-messages", {"type"}, + "Total number of processed messages.", "1", true); +} + +core_t::processed_messages_t core_t::processed_messages_instances() { + auto fm = processed_messages_family(); + return { + fm->get_or_add({{"type", "data"}}), + fm->get_or_add({{"type", "command"}}), + fm->get_or_add({{"type", "routing-update"}}), + fm->get_or_add({{"type", "ping"}}), + fm->get_or_add({{"type", "pong"}}), + }; +} + +int_gauge_family* core_t::buffered_messages_family() { + return reg_->gauge_family("broker", "buffered-messages", {"type"}, + "Number of currently buffered messages."); +} + +core_t::buffered_messages_t core_t::buffered_messages_instances() { + auto fm = buffered_messages_family(); + return { + fm->get_or_add({{"type", "data"}}), + fm->get_or_add({{"type", "command"}}), + fm->get_or_add({{"type", "routing-update"}}), + fm->get_or_add({{"type", "ping"}}), + fm->get_or_add({{"type", "pong"}}), + }; +} + +// -- store metrics ------------------------------------------------------------ + +using store_t = metric_factory::store_t; + +int_gauge_family* store_t::input_channels_family() { + return reg_->gauge_family("broker", "store-input-channels", {"name"}, + "Number of active input channels in a data store."); +} + +int_gauge* store_t::input_channels_instance(std::string_view name) { + return input_channels_family()->get_or_add({{"name", name}}); +} + +int_gauge_family* store_t::out_of_order_updates_family() { + return reg_->gauge_family("broker", "store-input-channels", {"name"}, + "Number of active input channels in a data store."); +} + +int_gauge* store_t::out_of_order_updates_instance(std::string_view name) { + return out_of_order_updates_family()->get_or_add({{"name", name}}); +} + +int_gauge_family* store_t::output_channels_family() { + return reg_->gauge_family( + "broker", "store-output-channels", {"name"}, + "Number of active output channels in a data store."); +} + +int_gauge* store_t::output_channels_instance(std::string_view name) { + return output_channels_family()->get_or_add({{"name", name}}); +} + +int_gauge_family* store_t::entries_family() { + return reg_->gauge_family("broker", "store-entries", {"name"}, + "Number of entries in the data store."); +} + +int_gauge* store_t::entries_instance(std::string_view name) { + return entries_family()->get_or_add({{"name", name}}); +} + +int_counter_family* store_t::processed_updates_family() { + return reg_->counter_family("broker", "store-processed-updates", {"name"}, + "Number of processed data store updates.", "1", + true); +} + +int_counter* store_t::processed_updates_instance(std::string_view name) { + return processed_updates_family()->get_or_add({{"name", name}}); +} + +int_gauge_family* store_t::unacknowledged_updates_family() { + return reg_->gauge_family("broker", "store-unacknowledged-updates", {"name"}, + "Number of unacknowledged data store updates."); +} + +int_gauge* store_t::unacknowledged_updates_instance(std::string_view name) { + return unacknowledged_updates_family()->get_or_add({{"name", name}}); +} + +// --- constructors ------------------------------------------------------------ + +metric_factory::metric_factory(caf::actor_system& sys) noexcept + : metric_factory(sys.metrics()) { + // nop +} + +} // namespace broker::internal From 1d1ef4c21963e59c6ee9d797e13f35fae676c776 Mon Sep 17 00:00:00 2001 From: Dominik Charousset Date: Tue, 16 Aug 2022 17:35:02 +0200 Subject: [PATCH 2/2] Get initial number of entries from the backend --- src/internal/master_actor.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/internal/master_actor.cc b/src/internal/master_actor.cc index 22f0b499..a8cd3db7 100644 --- a/src/internal/master_actor.cc +++ b/src/internal/master_actor.cc @@ -69,6 +69,9 @@ master_state::master_state( } else { detail::die("failed to get master expiries while initializing"); } + if (auto entries = backend->size(); entries && *entries > 0) { + metrics.entries->value(static_cast(*entries)); + } BROKER_INFO("attached master" << id << "to" << store_name); }