Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect slow peers and WS clients by default #429

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbroker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ set(BROKER_SRC
broker/internal_command.cc
broker/mailbox.cc
broker/network_info.cc
broker/overflow_policy.cc
broker/p2p_message_type.cc
broker/peer_status.cc
broker/ping_envelope.cc
Expand Down
13 changes: 13 additions & 0 deletions libbroker/broker/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "broker/address.hh"
#include "broker/alm/multipath.hh"
#include "broker/config.hh"
#include "broker/convert.hh"
#include "broker/data.hh"
#include "broker/endpoint.hh"
#include "broker/internal/configuration_access.hh"
Expand Down Expand Up @@ -113,6 +114,10 @@ struct configuration::impl : public caf::actor_system_config {
.add(options.disable_forwarding, "disable-forwarding",
"disables forwarding of incoming data to peers")
.add(options.ttl, "ttl", "drop messages after traversing TTL hops")
.add(options.peer_buffer_size, "peer-buffer-size",
"maximum number of items we buffer per peer before dropping it")
.add(options.web_socket_buffer_size, "web_socket-buffer-size",
"maximum number of items we buffer per web_socket")
.add<string>("recording-directory",
"path for storing recorded meta information")
.add<size_t>(
Expand Down Expand Up @@ -174,6 +179,8 @@ struct configuration::impl : public caf::actor_system_config {
auto& grp = result["broker"].as_dictionary();
put_missing(grp, "disable-ssl", options.disable_ssl);
put_missing(grp, "ttl", options.ttl);
put_missing(grp, "peer-buffer-size", options.peer_buffer_size);
put_missing(grp, "web_socket-buffer-size", options.web_socket_buffer_size);
put_missing(grp, "disable-forwarding", options.disable_forwarding);
if (auto path = get_as<std::string>(content, "broker.recording-directory"))
put_missing(grp, "recording-directory", std::move(*path));
Expand All @@ -199,6 +206,12 @@ configuration::configuration(skip_init_t) {
configuration::configuration(broker_options opts) : configuration(skip_init) {
impl_->options = opts;
impl_->set("broker.ttl", opts.ttl);
impl_->set("broker.peer-buffer-size", opts.peer_buffer_size);
caf::put(impl_->content, "broker.peer-overflow-policy",
broker::to_string(opts.peer_overflow_policy));
impl_->set("broker.web_socket-buffer-size", opts.web_socket_buffer_size);
caf::put(impl_->content, "broker.web_socket-overflow-policy",
broker::to_string(opts.web_socket_overflow_policy));
caf::put(impl_->content, "disable-forwarding", opts.disable_forwarding);
init(0, nullptr);
impl_->config_file_path = "broker.conf";
Expand Down
17 changes: 17 additions & 0 deletions libbroker/broker/configuration.hh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "broker/defaults.hh"
#include "broker/overflow_policy.hh"

#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -46,6 +47,22 @@ struct broker_options {
/// How many hops we forward at the most before dropping a message.
uint16_t ttl = defaults::ttl;

/// Configures how many items we buffer at most per peer before considering
/// it unreseponsive and dropping the connection.
size_t peer_buffer_size = defaults::peer_buffer_size;

/// Configures how Broker responds to peers that cannot keep up with the
/// incoming message rate.
overflow_policy peer_overflow_policy = overflow_policy::disconnect;

/// Configures how many items we buffer at most per web_socket client before
/// considering it unreseponsive and dropping the connection.
size_t web_socket_buffer_size = defaults::web_socket_buffer_size;

/// Configures how Broker responds to web_sockets that cannot keep up with the
/// incoming message rate.
overflow_policy web_socket_overflow_policy = overflow_policy::disconnect;

broker_options() = default;

broker_options(const broker_options&) = default;
Expand Down
17 changes: 17 additions & 0 deletions libbroker/broker/defaults.hh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "broker/overflow_policy.hh"
#include "broker/time.hh"

#include <chrono>
Expand All @@ -22,6 +23,22 @@ constexpr timespan await_peer_timeout = std::chrono::seconds{10};
/// Configures the default timeout for unpeering from another node.
constexpr timespan unpeer_timeout = std::chrono::seconds{3};

/// Configures how many items we buffer at most per peer before considering it
/// unreseponsive and dropping the connection.
constexpr size_t peer_buffer_size = 2048;

/// Configures how Broker responds to peers that cannot keep up with the
/// incoming message rate.
constexpr auto peer_overflow_policy = overflow_policy::disconnect;

/// Configures how many items we buffer at most per web_socket client before
/// considering it unreseponsive and dropping the connection.
constexpr size_t web_socket_buffer_size = 512;

/// Configures how Broker responds to web_sockets that cannot keep up with the
/// incoming message rate.
constexpr auto web_socket_overflow_policy = overflow_policy::disconnect;

} // namespace broker::defaults

namespace broker::defaults::subscriber {
Expand Down
150 changes: 122 additions & 28 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ caf::behavior core_actor_state::make_behavior() {
},
// -- interface for publishers ---------------------------------------------
[this](data_consumer_res src) {
auto consumer_id = endpoint_id::random();
auto [in, sub] =
self
->make_observable() //
Expand All @@ -409,10 +410,16 @@ caf::behavior core_actor_state::make_behavior() {
.map([this](const data_message& msg) { return node_message{msg}; })
.compose(local_publisher_scope_adder())
.compose(add_killswitch_t{});
flow_inputs.push(in);
flow_inputs.push(in.do_finally([this, consumer_id] {
auto i = subscriptions.find(consumer_id);
if (i != subscriptions.end()) {
subscriptions.erase(i);
}
})
.as_observable());
// TODO: next lines seems to be a false positive, but maybe there's
// something we can do upstream to avoid the alert.
subscriptions.push_back(sub); // NOLINT
subscriptions[consumer_id].push_back(sub); // NOLINT
},
// -- data store management ------------------------------------------------
[this](atom::data_store, atom::clone, atom::attach, const std::string& name,
Expand Down Expand Up @@ -513,8 +520,11 @@ void core_actor_state::shutdown(shutdown_options options) {
// We no longer add new input flows.
flow_inputs.close();
// Cancel all subscriptions to local publishers.
for (auto& sub : subscriptions)
sub.dispose();
for (auto& [id, subs] : subscriptions) {
for (auto& sub : subs) {
sub.dispose();
}
}
subscriptions.clear();
// Inform our clients that we no longer wait for any peer.
BROKER_DEBUG("cancel" << awaited_peers.size()
Expand Down Expand Up @@ -720,10 +730,28 @@ void core_actor_state::client_added(endpoint_id client_id,

void core_actor_state::client_removed(endpoint_id client_id,
const network_info& addr,
const std::string& type) {
const std::string& type,
const caf::error& reason, bool removed) {
BROKER_TRACE(BROKER_ARG(client_id) << BROKER_ARG(addr) << BROKER_ARG(type));
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_lost>(),
"lost connection to client");
auto i = subscriptions.find(client_id);
if (i == subscriptions.end()) {
return;
}
disposable_list subs;
i->second.swap(subs);
subscriptions.erase(i);
for (auto& sub : subs) {
sub.dispose();
}
metrics.web_socket_connections->Decrement();
if (removed) {
auto msg = "client removed: " + to_string(reason);
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_removed>(),
msg.c_str());
} else {
emit(endpoint_info{client_id, addr, type}, sc_constant<sc::peer_lost>(),
"lost connection to client");
}
emit(endpoint_info{client_id, std::nullopt, type},
sc_constant<sc::endpoint_unreachable>(), "lost the last path");
}
Expand Down Expand Up @@ -841,6 +869,12 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id,
return msg;
return msg->with(id, msg->receiver());
})
// Disconnect unresponsive peers.
.on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy())
.do_on_error([this, ptr, peer_id](const caf::error& what) {
BROKER_INFO("remove peer" << peer_id << "due to:" << what);
ptr->force_disconnect();
})
.as_observable());
// Push messages received from the peer into the central merge point.
flow_inputs.push( //
Expand Down Expand Up @@ -939,33 +973,44 @@ caf::error core_actor_state::init_new_client(const network_info& addr,
client_added(client_id, addr, type);
// Hook into the central merge point for forwarding the data to the client.
if (out_res) {
auto sub = central_merge
// Select by subscription.
.filter([this, filt = std::move(filter),
client_id](const node_message& msg) {
if (get_type(msg) != packed_message_type::data
|| get_sender(msg) == client_id)
return false;
detail::prefix_matcher f;
return f(filt, get_topic(msg));
})
// Deserialize payload and wrap it into a data message.
.map([this](const node_message& msg) { //
return msg->as_data();
})
// Emit values to the producer resource.
.subscribe(std::move(out_res));
subscriptions.emplace_back(sub);
auto sub =
central_merge
// Select by subscription.
.filter(
[this, filt = std::move(filter), client_id](const node_message& msg) {
if (get_type(msg) != packed_message_type::data
|| get_sender(msg) == client_id)
return false;
detail::prefix_matcher f;
return f(filt, get_topic(msg));
})
// Deserialize payload and wrap it into a data message.
.map([this](const node_message& msg) { //
return msg->as_data();
})
// Disconnect unresponsive clients.
.on_backpressure_buffer(web_socket_buffer_size(),
web_socket_overflow_policy())
.do_on_error([this, client_id, addr, type](const caf::error& reason) {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type, reason, true);
})
// Emit values to the producer resource.
.subscribe(std::move(out_res));
subscriptions[client_id].emplace_back(sub);
}
// Push messages received from the client into the central merge point.
auto [in, ks] =
self->make_observable()
.from_resource(std::move(in_res))
// If the client closes this buffer, we assume a disconnect.
.do_finally([this, client_id, addr, type] {
.do_on_complete([this, client_id, addr, type] {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type);
metrics.web_socket_connections->Decrement();
client_removed(client_id, addr, type, caf::error{}, false);
})
.do_on_error([this, client_id, addr, type](const caf::error& reason) {
BROKER_DEBUG("client" << addr << "disconnected");
client_removed(client_id, addr, type, reason, false);
})
.map([this, client_id](const data_message& msg) {
metrics_for(packed_message_type::data).buffered->Increment();
Expand All @@ -980,7 +1025,7 @@ caf::error core_actor_state::init_new_client(const network_info& addr,
.on_error_complete()
.compose(add_killswitch_t{});
flow_inputs.push(in);
subscriptions.emplace_back(ks);
subscriptions[client_id].emplace_back(ks);
return caf::none;
}

Expand Down Expand Up @@ -1167,4 +1212,53 @@ bool core_actor_state::shutting_down() {
return !self->has_behavior();
}

// -- properties ---------------------------------------------------------------

namespace {

caf::flow::backpressure_overflow_strategy
overflow_policy_from_string(const std::string* str, overflow_policy fallback) {
using caf::flow::backpressure_overflow_strategy;
if (str != nullptr) {
if (*str == "drop_newest") {
return backpressure_overflow_strategy::drop_newest;
}
if (*str == "drop_oldest") {
return backpressure_overflow_strategy::drop_oldest;
}
if (*str == "disconnect") {
return backpressure_overflow_strategy::fail;
}
}
// Note: overflow_policy and backpressure_overflow_strategy have the same
// values. Hence, casting one to the other is safe.
return static_cast<backpressure_overflow_strategy>(fallback);
}

} // namespace

size_t core_actor_state::peer_buffer_size() {
return caf::get_or(self->config(), "broker.peer-buffer-size",
defaults::peer_buffer_size);
}

caf::flow::backpressure_overflow_strategy
core_actor_state::peer_overflow_policy() {
auto* str = caf::get_if<std::string>(&self->config(),
"broker.peer-overflow-policy");
return overflow_policy_from_string(str, defaults::peer_overflow_policy);
}

size_t core_actor_state::web_socket_buffer_size() {
return caf::get_or(self->config(), "broker.web-socket-buffer-size",
defaults::web_socket_buffer_size);
}

caf::flow::backpressure_overflow_strategy
core_actor_state::web_socket_overflow_policy() {
auto* str = caf::get_if<std::string>(&self->config(),
"broker.web-socket-overflow-policy");
return overflow_policy_from_string(str, defaults::web_socket_overflow_policy);
}

} // namespace broker::internal
15 changes: 13 additions & 2 deletions libbroker/broker/internal/core_actor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public:

/// Called whenever a client disconnected.
void client_removed(endpoint_id client_id, const network_info& addr,
const std::string& type);
const std::string& type, const caf::error& reason,
bool removed);

// -- connection management --------------------------------------------------

Expand Down Expand Up @@ -213,6 +214,14 @@ public:

// -- properties -------------------------------------------------------------

size_t peer_buffer_size();

caf::flow::backpressure_overflow_strategy peer_overflow_policy();

size_t web_socket_buffer_size();

caf::flow::backpressure_overflow_strategy web_socket_overflow_policy();

/// Points to the actor itself.
caf::event_based_actor* self;

Expand Down Expand Up @@ -285,8 +294,10 @@ public:
/// memory regions over and over again.
caf::byte_buffer buf;

using disposable_list = std::vector<caf::disposable>;

/// Stores the subscriptions for our input sources to allow us to cancel them.
std::vector<caf::disposable> subscriptions;
std::map<endpoint_id, disposable_list> subscriptions;

/// Bundles state for a subscriber that does not integrate into the flows.
struct legacy_subscriber {
Expand Down
4 changes: 3 additions & 1 deletion libbroker/broker/internal/peering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ void peering::on_bye_ack() {
}

void peering::force_disconnect() {
assert(removed_);
if (!removed_) {
removed_ = true;
}
on_bye_ack();
}

Expand Down
Loading
Loading