Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/topic/neverlord/gh-254'
Browse files Browse the repository at this point in the history
* origin/topic/neverlord/gh-254:
  Get initial number of entries from the backend
  Add additional health and performance metrics
  • Loading branch information
timwoj committed Sep 20, 2022
2 parents 7352135 + 1d1ef4c commit 4f1667f
Show file tree
Hide file tree
Showing 11 changed files with 631 additions and 46 deletions.
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
2.4.0-dev.72 | 2022-09-20 08:49:22 -0700

* Get initial number of entries from the backend (Dominik Charousset, Corelight)

* Add additional health and performance metrics (Dominik Charousset, Corelight)

2.4.0-dev.69 | 2022-09-20 08:20:48 -0700

* Enable performance-* checks and fix findings (Dominik Charousset, Corelight)
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ set(BROKER_SRC
src/internal/master_resolver.cc
src/internal/metric_collector.cc
src/internal/metric_exporter.cc
src/internal/metric_factory.cc
src/internal/metric_scraper.cc
src/internal/metric_view.cc
src/internal/pending_connection.cc
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.0-dev.69
2.4.0-dev.72
161 changes: 149 additions & 12 deletions include/broker/internal/channel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "broker/error.hh"
#include "broker/internal/logger.hh"
#include "broker/internal/metric_factory.hh"
#include "broker/lamport_timestamp.hh"
#include "broker/none.hh"

Expand Down Expand Up @@ -156,6 +157,56 @@ public:

using path_list = std::vector<path>;

/// Bundles metrics for the producer.
struct metrics_t {
/// Keeps track of how many output channels exist.
caf::telemetry::int_gauge* output_channels = nullptr;

/// Keeps track of how many messages currently wait for an ACK.
caf::telemetry::int_gauge* unacknowledged = nullptr;

/// Keeps track of how many messages were sent (acknowledged) in total.
caf::telemetry::int_counter* processed = nullptr;

void init(caf::telemetry::metric_registry& reg, std::string_view name) {
metric_factory factory{reg};
output_channels = factory.store.output_channels_instance(name);
unacknowledged = factory.store.unacknowledged_updates_instance(name);
processed = factory.store.processed_updates_instance(name);
}

void init(caf::actor_system& sys, std::string_view name) {
init(sys.metrics(), name);
}

bool initialized() const noexcept {
// Either all pointer are null or none.
return output_channels != nullptr;
}

void inc_output_channels() {
if (output_channels)
output_channels->inc();
}

void dec_output_channels() {
if (output_channels)
output_channels->dec();
}

void inc_unacknowledged() {
if (unacknowledged)
unacknowledged->inc();
}

void shipped(int64_t num) {
if (unacknowledged) {
unacknowledged->dec(num);
processed->inc(num);
}
}
};

// -- constructors, destructors, and assignment operators ------------------

explicit producer(Backend* backend) : backend_(backend) {
Expand All @@ -167,6 +218,7 @@ public:
void produce(Payload content) {
if (paths_.empty())
return;
metrics_.inc_unacknowledged();
++seq_;
buf_.emplace_back(event{seq_, std::move(content)});
last_broadcast_ = tick_;
Expand All @@ -177,6 +229,7 @@ public:
if (find_path(hdl) != paths_.end())
return ec::consumer_exists;
BROKER_DEBUG("add" << hdl << "to the channel");
metrics_.inc_output_channels();
paths_.emplace_back(path{hdl, seq_, 0, tick_});
backend_->send(this, hdl, handshake{seq_, heartbeat_interval_});
return {};
Expand Down Expand Up @@ -214,8 +267,11 @@ public:
}
// Drop events from the buffer if possible.
auto not_acked = [acked](const event& x) { return x.seq > acked; };
buf_.erase(buf_.begin(),
std::find_if(buf_.begin(), buf_.end(), not_acked));
auto new_begin = std::find_if(buf_.begin(), buf_.end(), not_acked);
if (auto n = std::distance(buf_.begin(), new_begin); n > 0) {
metrics_.shipped(n);
buf_.erase(buf_.begin(), new_begin);
}
}

void handle_nack(const Handle& hdl,
Expand Down Expand Up @@ -272,6 +328,7 @@ public:
for (auto i = paths_.begin(); i != paths_.end();) {
if (tick_.value - i->last_seen.value >= timeout) {
BROKER_DEBUG("remove" << i->hdl << "from channel: consumer timeout");
metrics_.dec_output_channels();
backend_->drop(this, i->hdl, ec::connection_timeout);
i = paths_.erase(i);
++erased_paths;
Expand All @@ -289,8 +346,11 @@ public:
if (i->acked < acked)
acked = i->acked;
auto not_acked = [acked](const event& x) { return x.seq > acked; };
buf_.erase(buf_.begin(),
std::find_if(buf_.begin(), buf_.end(), not_acked));
auto new_begin = std::find_if(buf_.begin(), buf_.end(), not_acked);
if (auto n = std::distance(buf_.begin(), new_begin); n > 0) {
metrics_.shipped(n);
buf_.erase(buf_.begin(), new_begin);
}
}
}

Expand All @@ -304,6 +364,10 @@ public:
return *backend_;
}

metrics_t& metrics() noexcept {
return metrics_;
}

auto seq() const noexcept {
return seq_;
}
Expand Down Expand Up @@ -374,6 +438,9 @@ public:
/// Transmits messages to the consumers.
Backend* backend_;

/// Caches pointers to the metric instances.
metrics_t metrics_;

/// Monotonically increasing counter (starting at 1) to establish ordering
/// of messages on this channel. Since we start at 1, the first message we
/// send is going to have a sequence number of *2*. This enables us to
Expand Down Expand Up @@ -452,6 +519,50 @@ public:

using buf_type = std::deque<optional_event>;

/// Bundles metrics for the consumer.
struct metrics_t {
/// Keeps track of how many output channels exist.
caf::telemetry::int_gauge* input_channels = nullptr;

/// Keeps track of how many messages are currently buffered because they
/// arrived out-of-order.
caf::telemetry::int_gauge* out_of_order_updates = nullptr;

void init(caf::telemetry::metric_registry& reg, std::string_view name) {
metric_factory factory{reg};
input_channels = factory.store.input_channels_instance(name);
}

void init(caf::actor_system& sys, std::string_view name) {
init(sys.metrics(), name);
}

bool initialized() const noexcept {
// Either all pointer are null or none.
return input_channels != nullptr;
}

void inc_input_channels() {
if (input_channels)
input_channels->inc();
}

void dec_input_channels() {
if (input_channels)
input_channels->dec();
}

void inc_out_of_order_updates() {
if (out_of_order_updates)
out_of_order_updates->inc();
}

void dec_out_of_order_updates(int64_t n = 1) {
if (out_of_order_updates)
out_of_order_updates->dec(n);
}
};

// -- constructors, destructors, and assignment operators ------------------

explicit consumer(Backend* backend) : backend_(backend) {
Expand Down Expand Up @@ -493,12 +604,17 @@ public:
// message before that point.
if (!buf_.empty()) {
auto pred = [=](const optional_event& x) { return x.seq > offset; };
auto i = std::find_if(buf_.begin(), buf_.end(), pred);
buf_.erase(buf_.begin(), i);
auto new_begin = std::find_if(buf_.begin(), buf_.end(), pred);
if (auto n = std::distance(buf_.begin(), new_begin); n > 0) {
metrics_.dec_out_of_order_updates(n);
buf_.erase(buf_.begin(), new_begin);
}
}
// Consume buffered messages if possible and send initial ACK.
try_consume_buffer();
send_ack();
// Update our metric. This counter can only oscillate between 0 and 1.
metrics_.inc_input_channels();
return true;
}

Expand All @@ -524,12 +640,15 @@ public:
// Insert event into buf_: sort by the sequence number, drop duplicates.
auto pred = [seq](const optional_event& x) { return x.seq >= seq; };
auto i = std::find_if(buf_.begin(), buf_.end(), pred);
if (i == buf_.end())
if (i == buf_.end()) {
buf_.emplace_back(seq, std::move(payload));
else if (i->seq != seq)
metrics_.inc_out_of_order_updates();
} else if (i->seq != seq) {
metrics_.inc_out_of_order_updates();
buf_.emplace(i, seq, std::move(payload));
else if (!i->content)
} else if (!i->content) {
i->content = std::move(payload);
}
}
}

Expand All @@ -538,6 +657,7 @@ public:
// Process immediately.
if (auto err = backend_->consume_nil(this)) {
backend_->close(this, std::move(err));
reset();
return;
}
bump_seq();
Expand All @@ -546,10 +666,13 @@ public:
// Insert event into buf_: sort by the sequence number, drop duplicates.
auto pred = [seq](const optional_event& x) { return x.seq >= seq; };
auto i = std::find_if(buf_.begin(), buf_.end(), pred);
if (i == buf_.end())
if (i == buf_.end()) {
buf_.emplace_back(seq);
else if (i->seq != seq)
metrics_.inc_out_of_order_updates();
} else if (i->seq != seq) {
buf_.emplace(i, seq);
metrics_.inc_out_of_order_updates();
}
}
}

Expand Down Expand Up @@ -615,6 +738,10 @@ public:
return *backend_;
}

metrics_t& metrics() noexcept {
return metrics_;
}

const auto& producer() const {
return producer_;
}
Expand Down Expand Up @@ -681,6 +808,9 @@ public:
}

void reset() {
if (initialized()) {
metrics_.dec_input_channels();
}
producer_ = Handle{};
next_seq_ = 0;
last_seq_ = 0;
Expand Down Expand Up @@ -712,12 +842,16 @@ public:
if (auto err = backend_->consume_nil(this)) {
buf_.erase(buf_.begin(), i);
backend_->close(this, std::move(err));
reset();
return;
}
}
bump_seq();
}
buf_.erase(buf_.begin(), i);
if (auto n = std::distance(buf_.begin(), i); n > 0) {
buf_.erase(buf_.begin(), i);
metrics_.dec_out_of_order_updates(n);
}
}

void send_ack() {
Expand All @@ -729,6 +863,9 @@ public:
/// Handles incoming events.
Backend* backend_;

/// Caches pointers to the metric instances.
metrics_t metrics_;

/// Stores the handle of the producer.
Handle producer_;

Expand Down
44 changes: 44 additions & 0 deletions include/broker/internal/core_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
#include <caf/disposable.hpp>
#include <caf/flow/observable.hpp>
#include <caf/make_counted.hpp>
#include <caf/telemetry/counter.hpp>
#include <caf/telemetry/gauge.hpp>

#include <optional>
#include <string_view>
#include <unordered_map>

namespace broker::internal {
Expand Down Expand Up @@ -53,6 +56,39 @@ public:
/// Convenience alias for a map of @ref peer_state objects.
using peer_state_map = std::unordered_map<endpoint_id, peer_state>;

/// Bundles message-related metrics that have a label dimension for the type.
struct message_metrics_t {
/// Counts how many messages were processed since starting the core.
caf::telemetry::int_counter* processed = nullptr;

/// Keeps track of how many messages are currently buffered at the core.
caf::telemetry::int_gauge* buffered = nullptr;

void assign(caf::telemetry::int_counter* processed_instance,
caf::telemetry::int_gauge* buffered_instance) noexcept {
processed = processed_instance;
buffered = buffered_instance;
}
};

/// Bundles metrics for the core.
struct metrics_t {
explicit metrics_t(caf::actor_system& sys);

/// Keeps track of how many native peers are currently connected.
caf::telemetry::int_gauge* native_connections = nullptr;

/// Keeps track of how many WebSocket clients are currently connected.
caf::telemetry::int_gauge* web_socket_connections = nullptr;

/// Stores the metrics for all message types.
std::array<message_metrics_t, 6> message_metric_sets;

message_metrics_t& metrics_for(packed_message_type msg_type) {
return message_metric_sets[static_cast<size_t>(msg_type)];
}
};

// -- constants --------------------------------------------------------------

static inline const char* name = "broker.core";
Expand Down Expand Up @@ -274,6 +310,9 @@ public:
/// Enables manual time management by the user.
endpoint::clock* clock;

/// Caches pointers to the Broker metrics.
metrics_t metrics;

/// Stores all master actors created by this endpoint.
std::unordered_map<std::string, caf::actor> masters;

Expand Down Expand Up @@ -328,6 +367,11 @@ public:

/// Returns whether `shutdown` was called.
bool shutting_down();

/// Returns the metrics set for a given message type.
message_metrics_t& metrics_for(packed_message_type msg_type) {
return metrics.metrics_for(msg_type);
}
};

using core_actor = caf::stateful_actor<core_actor_state>;
Expand Down
Loading

0 comments on commit 4f1667f

Please sign in to comment.