-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.8 InDepth Message Sinks
- Table of Contents
- The problem and the proposed solution
- message_sinks and delivery filters
- Ready-to-use messages sinks in SObjectizer
- Another possible application
- Some technical details
Created by gh-md-toc
In SObjectizer prior to v.5.8.0 only agents can be used to subscribe to messages from message boxes. This meant that only agents could be a destination for message delivery.
But there were cases where the only one type of destination was a limitation.
For example, we could have an agent that receives and handles messages of type MsgA and MsgB from own direct mbox:
class first_handler final : public so_5::agent_t {
public:
...
void so_define_agent() override {
so_subscribe_self()
.event([this](mhood_t<MsgA> cmd) {...})
.event([this](mhood_t<MsgB> cmd) {...})
...
;
}
};
And we could have an agent that receives and handles messages of type MsgC from own direct mbox:
class second_handler final : public so_5::agent_t {
public:
...
void so_define_agent() override {
so_subscribe_self()
.event([this](mhood_t<MsgC> cmd) {...})
...
;
}
};
And, finally, we could have an agent that creates a MPMC mbox and uses it for sending messages of types MsgA, MsgB, and MsgC:
class data_provider final : public so_5::agent_t {
const so_5::mbox_t distribution_mbox_;
...
public:
data_provider(context_t ctx)
: so_5::agent_t{std::move(ctx))
// NOTE: distribution_mbox is created in the constructor.
, distribution_mbox{so_environment().create_mbox()}
{}
...
// Distribution mbox can be obtained.
[[nodiscard]] so_5::mbox_t distribution_mbox() const {
return distribution_mbox_;
}
...
void evt_data_detected(mhood_t<msg_data_detected> cmd) {
... // Data acquisition and preprocessing.
// Data distribution in the form of several messages.
so_5::send<MsgA>(distribution_mbox_, ...);
so_5::send<MsgB>(distribution_mbox_, ...);
so_5::send<MsgC>(distribution_mbox_, ...);
...
}
...
}
Now suppose that we can't change any of those agents but want to redirect messages MsgA and MsgB from the distribution mbox to the direct mbox of the first_handler, and message MsgC to the direct mbox of the second_handler.
Until v.5.8.0 the only way to do that is to write a proxy-agent that subscribes to the distribution mbox and resends messages to the direct mboxes:
class data_provider final : public so_5::agent_t {
const so_5::mbox_t distribution_mbox_;
const so_5::mbox_t first_handler_mbox_;
const so_5::mbox_t second_handler_mbox_;
public:
// Constructor receives all mboxes.
...
void so_define_agent() override {
// Implement resending.
so_subscribe().from(distribution_mbox_)
.event([this](mhood_t<MsgA> cmd) { so_5::send(first_handler_mbox_, cmd); })
.event([this](mhood_t<MsgB> cmd) { so_5::send(first_handler_mbox_, cmd); })
.event([this](mhood_t<MsgC> cmd) { so_5::send(second_handler_mbox_, cmd); })
;
}
};
This approach works, but it has at least two serious flaws:
- writing such proxies is a boring and error-prone task. It may look not so difficult but it can get a bit harder if it's necessary to create/remove such redirects on the fly;
- it's not efficient because for every redirected message an event of proxy-agent has to be run in the context of some dispatcher and only then a message will be sent to the actual target.
To simplify handling scenarios like that a new abstraction, message_sink, has been introduced in v.5.8.0.
A message_sink is the destination to messages sent to a mbox. It means that since v.5.8.0 mboxes hold references to messages sinks, not to agents in the subscription info. And starting from v.5.8.0 agents are just one type of message_sink. There could be other types: custom (written by a user) or standard (implemented by SObjectizer-5 Core).
Message boxes in v.5.8.0 receive references to message_sinks during subscription/unsubscription and setting/dropping delivery filters, and that is reflected in new formats of abstract_message_box_t
methods:
virtual void
subscribe_event_handler(
const std::type_index & type_index,
abstract_message_sink_t & subscriber ) = 0;
virtual void
unsubscribe_event_handler(
const std::type_index & type_index,
abstract_message_sink_t & subscriber ) noexcept = 0;
virtual void
set_delivery_filter(
const std::type_index & msg_type,
const delivery_filter_t & filter,
abstract_message_sink_t & subscriber ) = 0;
virtual void
drop_delivery_filter(
const std::type_index & msg_type,
abstract_message_sink_t & subscriber ) noexcept = 0;
SObjectizer-5.8.0 provides two standard implementations of message_sink abstraction, both of them are hidden from a user:
- the first one is representation of an agent as a message_sink. A sink of that type is automatically created when an agent makes a subscription. This is performed inside
so_subscribe()
/so_subscribe_self()
stuff and is invisible to a user; - the second one is a wrapper around any valid mbox and is created by a new helper function
so_5::wrap_to_msink()
. The returned value can be used with standard classesso_5::single_sink_binding_t
andso_5::multi_sink_binding_t
.
As of v.5.8.0 the problem described above can be solved this way:
const so_5::mbox_t distribution_mbox = data_distributor->distribution_mbox();
const so_5::mbox_t first_dest = first_handler->so_direct_mbox();
const so_5::mbox_t second_dest = second_handler->so_direct_mbox();
auto binding = std::make_unique<so_5::multi_sink_binding_t>();
binding->bind<MsgA>(distribution_mbox, so_5::wrap_to_msink(first_dest));
binding->bind<MsgB>(distribution_mbox, so_5::wrap_to_msink(first_dest));
binding->bind<MsgC>(distribution_mbox, so_5::wrap_to_msink(second_dest));
// `binding` has to be stored somewhere to outlive data_distributor, first_dest and second_dest.
With this approach a message will be sent to the appropriate handler almost directly (inside so_5::send
call, but with nested calls to so_5::abstract_message_box_t::do_deliver_message
). This approach also allows easy modification of existing subscriptions.
The message_sink abstraction allows implementation of very flexible message delivery schemes but direct work with message_sink requires deep understanding of how message_sinks works and very careful use of message_sink objects. To simplify life in common cases a couple of standard helper classes have been added to v.5.8.0: so_5::single_sink_binding_t
and so_5::multi_sink_binding_t
. They allow you to create and hold subscriptions to a message from a mbox to an arbitrary message_sink.
The simplest one is the so_5::single_sink_binding_t
class. It holds just one subscription regardless of the message type and/or message source. For example:
so_5::single_sink_binding_t binding;
// Create a subscription.
binding.bind<MsgA>(src, dest_one);
// Now an instance of MsgA will be sent to dest_one via src.
so_5::send<MsgA>(src, ...);
// Modify existing subscription to MsgA from src.
// The existing subscription for dest_one will be erased.
binding.bind<MsgA>(src, dest_two);
// Now an instance of MsgA will be sent to dest_two via src.
so_5::send<MsgA>(src, ...);
Please note that an instance of so_5::single_sink_binding_t
can't hold multiple bindings for different (msg_type, src_mbox) pairs:
so_5::single_sink_binding_t binding;
binding.bind<MsgA>(src, dest_one);
binding.bind<MsgB>(src, dest_one); // Replaces subscription to MsgA from src.
binding.bind<MsgC>(src, dest_two); // Replaces subscription to MsgB from src.
binding.bind<MsgD>(src, dest_three); // Replaces subscription to MsgC from src.
binding.bind<MsgA>(another_src, dest_one); // Replaces subscription to MsgD from src.
binding.bind<MsgB>(another_src, dest_two); // Replaces subscription to MsgA from another_src.
...
The so_5::single_sink_binding_t
class isn't thread safe and must not be used from different threads at the same time.
The more complex and heavy weighted class is so_5::multi_sink_binding_t
. It allows to hold several subscription for a (msg_type, src_mbox) pair:
so_5::multi_sink_binding_t<> binding;
// Create a subscription.
binding.bind<MsgA>(src, dest_one);
// Now an instance of MsgA will be sent to dest_one via src.
so_5::send<MsgA>(src, ...);
// Add another subscription for (MsgA, src) pair.
binding.bind<MsgA>(src, dest_two);
// Now an instance of MsgA will be sent to dest_two and dest_one.
so_5::send<MsgA>(src, ...);
The so_5::multi_sink_binding_t
class can be parametrized by lock-type for thread safety. For example:
// Thread-safe instance because std::mutex is used by the default.
so_5::multi_sink_binding_t<> binding_one;
...
// Thread-safe instance because std::mutex is specified explicitly.
so_5::multi_sink_binding_t<std::mutex> binding_two;
...
// Not a thread-safe instance.
so_5::multi_sink_binding_t<so_5::null_mutex_t> binding_three;
...
The classes so_5::single_sink_binding_t
and so_5::multi_sink_binding_t
drop all current subscriptions in the destructor. So it's important to control the lifetime of objects of these classes to keep the subscriptions as long as we need them, but no longer.
There are three approaches that could be appropriate for different situations.
The first and the simplest is to make binding object a part of some agent:
class my_agent : public so_5::agent_t {
// Binding as part of the agent.
// Binding will be destroyed with the agent.
so_5::single_sink_binding_t binding_;
...
};
Sometimes the binding object has to be shared between several agents, std::shared_ptr
can be used in that case:
class first_agent : public so_5::agent_t {
std::shared_ptr<so_5::multi_sink_binding_t<>> binding_;
... // Assume that binding will be passed to the constructor.
};
class second_agent : public so_5::agent_t {
std::shared_ptr<so_5::multi_sink_binding_t<>> binding_;
... // Assume that binding will be passed to the constructor.
};
...
auto binding = std::make_shared<so_5::multi_sink_binding_t<>>();
// Pass the binding to the constructor of new agents.
coop->make_agent<first_agent>(binding, ...);
coop->make_agent<second_agent>(binding, ...);
Another way is to put an instance of binding object under the control of a coop:
env.introduce_coop([&](so_5::coop_t & coop) {
auto * distributor = coop.make_agent<data_distributor>(...);
auto * first = coop.make_agent<first_handler>(...);
auto * second = coop.make_agent<second_handler>(...);
auto * binding = coop.take_under_control(
std::make_unique<so_5::multi_sink_binding_t>());
binding->bind<MsgA>(distributor->distribution_mbox(),
so_5::wrap_to_msink(first->so_direct_mbox()));
binding->bind<MsgB>(distributor->distribution_mbox(),
so_5::wrap_to_msink(first->so_direct_mbox()));
binding->bind<MsgC>(distributor->distribution_mbox(),
so_5::wrap_to_msink(second->so_direct_mbox()));
});
The delivery filters can be used with message_sinks in the same way as they were used with agents in the previous versions of SObjectizer. This is because the mboxes apply delivery filters for an incoming message before delivering it to the destinations. The fact that the destinations are now message_sinks rather than agents doesn't change this way of working.
If a delivery filter is being set for an agent, then there is no even visible to user changes in SObjectizer-5 API:
struct msg_demo final : public so_5::message_t {
int value_;
msg_demo(int v) : value_{v} {}
};
class dr_demo final : public so_5::agent_t {
...
void so_define_agent() override {
// Setting a delivery filter.
so_set_delivery_filter(so_direct_mbox(),
[](const msg_demo & msg) -> bool {
return msg.value_ > 0 && msg.value_ < 127;
});
}
...
};
If an instance of so_5::single_sink_binding_t
or so_5::multi_sink_binding_t
is used for message redirection then a delivery filter can be set via bind()
method:
const so_5::mbox_t & src = ...;
const so_5::mbox_t & dest = ...;
so_5::single_sink_binding_t & binding = ...;
binding.bind<msg_demo>(src, so_5::wrap_to_msink(dest),
[](const msg_demo & msg) -> bool {
return msg.value_ > 0 && msg.value_ < 127;
});
Since v.5.8.1 there is an implementation of a message sink that transforms and redirects a message/signal.
The easiest way to use this functionality is to use the so_5::bind_transformer
helper function:
so_5::single_sink_binding_t first_binding; // Or so_5::multi_sink_binding_t.
so_5::bind_transformer(first_binding, source_mbox,
// Type of the incoming message will be deduced from a lambda parameter.
[dest_mbox](const source_msg_type & msg) {
return so_5::make_transformed<result_msg_type>(dest_mbox,
... /* some values from msg */ );
});
so_5::single_sink_binding_t second_binding; // Or so_5::multi_sink_binding_t.
// Type of the source message is specified explicitly.
// This is required for mutable messages and signals.
so_5::bind_transformer< so_5::mutable_msg<source_msg_type> >(second_binding, source_mbox,
[dest_mbox](source_msg_type & msg) // Note the use of non-const reference, it is
// allowed here because the message is mutable.
{
return so_5::make_transformed<result_msg_type>(dest_mbox,
... /* some values from msg */ );
});
so_5::single_sink_binding_t third_binding; // Or so_5::multi_sink_binding_t.
so_5::bind_transformer< source_signal_type >(third_binding, source_mbox,
[dest_mbox]() // Note that transformer lambda without a parameter has to
// be used for signals.
{
return so_5::make_transformed<result_signal_type>(dest_mbox);
});
A transformer functor (a lambda or free standing function) should return either so_5::transformed_message_t<Msg>
or std::optional< so_5::transformed_message_t<Msg> >
. If an empty std::optional
is returned then nothing will be redirected:
so_5::bind_transformer(binding, source_mbox,
[dest_mbox](const source_msg_type & msg) -> std::optional<so_5::transformed_message_t<result_msg_type>>
{
// Check the possibility of transformation. If it is not possible then
// the message will be discarded by this message sink.
if(transformation_enabled(msg)) {
// Do the transformation.
return { so_5::make_transformed<result_msg_type>(dest_mbox, ...) };
}
else {
// Empty std::optional tells to discard the message.
return std::nullopt;
}
});
The so_5::bind_transformer
allows to specify an optional delivery filter (note that a delivery filter can only be specified for messages, not signals):
so_5::bind_transformer< so_5::mutable_msg<source_msg_type> >(binding, source_mbox,
// The transformer.
[dest_mbox](source_msg_type & msg) // Note the use of non-const reference, it is
// allowed here because the message is mutable.
{
return so_5::make_transformed<result_msg_type>(dest_mbox,
... /* some values from msg */ );
},
// The delivery filter. The delivery filter will be called first and only
// if the delivery filter return `true` the transformer will be invoked.
[](const auto & msg) // Note the use of const-reference.
// Delivery filters always receive const-references,
// even for mutable messages.
{
return ... /* some condition that uses the content of the `msg` */;
});
Under the hood the so_5::bind_transformer
uses so_5::msinks::transform_then_redirect
factory function(s). The so_5::msinks::transform_then_redirect
is also a new feature of v.5.8.1. It can be used by a user if simple so_5::bind_transformer
is not enough. For example:
struct part { ... };
struct compound {
part m_first;
part m_second;
};
...
so_5::mbox_t src_mbox = ...;
so_5::mbox_t dest_mbox = ...;
so_5::msink_t my_transformation_sink =
so_5::msinks::transform_then_redirect< so_5::mutable_msg<compound> >(
src_mbox->environment(),
[dest_mbox](compound & msg) { // or [dest_mbox](auto & msg)
return so_5::make_transformed<part>(
dest_mbox, // Destination for the transformed message.
std::move(msg.m_first) // Initializer for the new `part` instance.
);
}));
// The my_transformation_sink can now be passed to bind() method of
// so_5::single_sink_binding_t/multi_sink_binding_t, or for implementation
// of a more complex message sink.
Redirecting a message from one mbox to another is just one application of the new message_sink abstraction. We believe there may be other applications for it.
For example, let suppose that there is already existing code that uses a custom event-driven framework with own event-processing facilities (such as thread pools, message queues, event handlers, and so on), and it's necessary to create a bridge that receives SObjectizer's messages and transforms them into framework-related events. Such a bridge can be implemented as a message_sink. In this case this custom sink can be subscribed to a mbox using so_5::single_sink_binding_t
(or so_5::multi_sink_binding_t
). The SObjectizer part of the application will send messages to the usual mboxes the usual way, and these messages will be automatically converted and transparently passed to the non-SObjectizer part because of this custom sink.
The message_sink abstraction opens up new forms of message distribution. Thus, message_sinks make it possible to deliver messages to non-agents, but to other forms of message processors that a user may need.
However, implementing such new forms may require a deep understanding of the message_sinks' role in SObjectizer. And if you need more information on this topic feel free to contact the SObjectizer developers or open an issue with your questions.
All message_sinks have to implement (be derived from) so_5::abstract_message_sink_t
interface. The main part of this interface in v.5.8.0 looks like this:
class SO_5_TYPE abstract_message_sink_t
{
public:
abstract_message_sink_t() = default;
virtual ~abstract_message_sink_t() noexcept = default;
...
[[nodiscard]]
virtual so_5::environment_t &
environment() const noexcept = 0;
[[nodiscard]]
virtual so_5::priority_t
sink_priority() const noexcept = 0;
virtual void
push_event(
so_5::mbox_id_t mbox_id,
so_5::message_delivery_mode_t delivery_mode,
const std::type_index & msg_type,
const so_5::message_ref_t & message,
unsigned int redirection_deep,
const so_5::message_limit::impl::action_msg_tracer_t * tracer ) = 0;
};
The method environment()
plays the same role as the environment()
method of so_5::abstract_message_box()
: it should return a reference to the SObjectizer Environment for which the message_sink has been created.
The sink_priority()
method should return the priority associated with the sink. This value has the same meaning as the priority of an agent. In previous versions of SObjectizer message boxes held subscriptions ordered by priority: if two agents High (with priority 1) and Low (with priority 0) created subscriptions to MsgA from the same mbox M, then M held an ordered list of subscribers for MsgA in the form (High, Low) because High had a higher priority than Low, and an instance of MsgA was delivered to High first, and only then to Low.
Now mboxes hold references to message_sinks, but ordering of subscribers is still necessary: if agent High with higher priority is behind SinkH, and agent Low with lower priority is behind SinkL, then a mbox has to have the subscribers list for MsgA in the form (SinkH, SinkL). To maintain this order, the sink_priority()
method is used by mboxes.
If a message_sink is just a bridge to an agent, then the sink_priority()
must return the priority of the agent behind the message_sink. If a message_sink plays a different role (for example, redirects messages from one mbox to another), then sink_priority()
must be user-defined, but the lowest value so_5::prio::p0
is preferred if you have no real need to raise the priority for a reason.
The push_event
method must implement the main sink's logic by delivering a message (or signal) to the appropriate receiver.
For example, the standard implementation of message_sink for agent does the following in the push_event
method:
- checks the
redirection_deep
value. Ifredirection_deep
is too large, then the delivery is canceled and the message/signal is discarded; - checks the message limit if such a limit is defined for the agent. If the limit is defined and overflow case is detected then overflow reaction is performed;
- if
tracer
isn't nullptr (it means that message delivery tracing is ON) then all performed actions (like execution of overflow reaction or discarding of a message) is logged viatracer
object; - if the message isn't discarded nor redirected (by overflow reaction) then it will be pushed to the agent's event_queue.
The standard implementation of message_sink returned by so_5::wrap_to_msink()
only controls the redirection_deep
value and, if it's OK, calls do_deliver_message
for the wrapped mbox.
Because now messages from one mbox can be redirected to another mbox is easy to create circular redirections, for example:
so_5::mbox_t first = env.create_mbox();
so_5::mbox_t second = env.create_mbox();
so_5::single_sink_binding_t binding;
binding->bind<MsgA>(first, so_5::wrap_to_msink(second));
binding->bind<MsgA>(second, so_5::wrap_to_msink(first));
An attempt to send a MsgA to first
will lead to a loop of redirections from first
to second
and back.
SObjectizer uses a very simple scheme for detection of such loops: there is a value redirection_deep
for every message delivery attempt. This value is 0 when so_5::send
is called and it is increased every time message_sink redirects the message to another destination (to another mbox by calling do_deliver_message
or to another message_sink by calling push_event
).
All implementations of mboxes and message_sinks must check the redirection_deep
value. If it's equal to or greater than so_5::max_redirection_deep
then the delivery of message/signal must be canceled. Otherwise an infinite loop of redirection could occur.
NOTE. The same scheme is used for controlling redirection of a message when the message limit is exceeded.
If a user writes his/her own message_sink to deliver messages directly to an agent then message limits for that agent must be taken into account. This is critical because as of v.5.8.0 message limits are no longer handled by message boxes (as in previous versions), and since v.5.8.0 the handling of message limits is the responsibility of message_sinks.
Discussion of handling message limits in message_sinks is out of scope of this document. If you need more information about this topic please contact SObjectizer's developers.
Classes like so_5::single_sink_binding_t
and so_5::multi_sink_binding_t
need some kind of control on message_sink's lifetime. Especially if a message_sink is created in dynamic memory (like the so_5::wrap_to_msink()
function does). But different message_sinks may have different allocation schemes: some sinks may be dynamically allocated, some sinks may be members of other objects (for example, they may be fields of agents).
To handle all those cases SObjectizer v.5.8.0 introduced an interface so_5::abstract_sink_owner_t
that looks like:
class SO_5_TYPE abstract_sink_owner_t : protected atomic_refcounted_t
{
friend class intrusive_ptr_t< abstract_sink_owner_t >;
public:
abstract_sink_owner_t() = default;
virtual ~abstract_sink_owner_t() noexcept = default;
[[nodiscard]]
virtual abstract_message_sink_t &
sink() noexcept = 0;
[[nodiscard]]
virtual const abstract_message_sink_t &
sink() const noexcept = 0;
};
Instances of so_5::abstract_sink_owner_t
have to be created dynamically and their lifetime is controlled by using intrusive reference counting. An instance of so_5::abstract_sink_owner_t
can hold an actual message_sink inside or may hold a (smart) reference to a message_sink instance located elsewhere.
If a user implements his/her own message_sink and plans to use them with so_5::single_sink_binding_t
/so_5::multi_sink_binding_t
, then he/she has to wrap message_sink by some kind of so_5::abstract_sink_owner_t
(for example, so_5::simple_sink_owner_t
template can be used).
The so_5::msink_t
is a kind of smart pointer to so_5::abstract_sink_owner_t
:
using msink_t = intrusive_ptr_t< abstract_sink_owner_t >;
SObjectizer's so_5::intrusive_ptr_t
is similar to std::shared_ptr
, but doesn't support weak references. The so_5::msink_t
is very similar to so_5::mbox_t
and so_5::mchain_t
: they all just shorthands for so_5::intrusive_ptr_t
.
Helper template class so_5::simple_sink_owner_t
provides an implementation of so_5::abstract_sink_owner_t
for cases when an instance of user's message_sink can be embedded inside an instance of sink_owner. For example:
class my_special_sink final : public so_5::abstract_message_sink_t {
... // Implementation of inherited methods.
};
auto my_sink_owner = std::make_unique< so_5::simple_sink_owner_t<my_special_sink> >(...);
so_5::single_sink_binding_t & binding = ...;
binding.bind<MsgA>(src, so_5::msink_t{std::move(my_sink_owner)});
...
If a user wants to implement a custom type of a mbox, then he/she needs to sort pointers to message_sinks with respect to their priorities. Because of that isn't not enough to write something like that:
std::set<so_5::abstract_message_sink_t *> sinks_;
There should be a special comparator that takes care of comparing pointers and comparing sinks' priorities.
To simplify working with such comparisons there is a static method special_sink_ptr_compare
in so_5::abstract_message_sink_t
. In can be used in writing custom comparators:
struct my_mbox_sink_comparator {
[[nodiscard]] bool operator()(
const so_5::abstract_message_sink_t * a,
const so_5::abstract_message_sink_t * b) const noexcept
{
return so_5::abstract_message_sink_t::special_sink_ptr_compare(a, b);
}
};
...
std::set<so_5::abstract_message_sink_t *, my_mbox_sink_comparator> sinks_;
If a user want to work with so_5::msink_t
(not with raw pointers/references to so_5::abstract_message_sink_t
), then there is a helper class so_5::impl::msink_less_comparator_t
that can be used for this purpose:
std::set<so_5::msink_t, so_5::impl::msink_less_comparator_t> sinks_;
But notice, that so_5::impl::msink_less_comparator_t
isn't a part of a stable public API and can be changed/removed in some future version of SObjectizer.