-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.8 InDepth Dispatchers
- Table of Contents
- Introduction
-
An Overview Of Existing Dispatchers
- one_thread Dispatcher
- nef_one_thread Dispatcher
- active_obj Dispatcher
- active_group Dispatcher
- thread_pool Dispatcher
- nef_thread_pool Dispatcher
- adv_thread_pool Dispatcher
- prio_one_thread::strictly_ordered Dispatcher
- prio_one_thread::quoted_round_robin Dispatcher
- prio_dedicated_threads::one_per_prio Dispatcher
- Queue Locks
Created by gh-md-toc
Dispatchers are one of the cornerstones of SObjectizer. Dispatchers play a significant role in the behavior of any SObjectizer-based application.
It is because SObjectizer does not hide multithreading from a user. But simplifies work with multithreading in application domains where threads are important. Application domains are different. Even inside one domain different event scheduling policies might be needed. This is why SObjectizer provides several dispatcher types.
Because of that understanding of dispatcher concept is necessary for the usage of SObjectizer framework.
Long story short: dispatchers manage event queues and provide worker threads on which agents handle their events.
The single form of agents interaction in SObjectizer is asynchronous message passing. An agent sends a message to some mbox. If there is a subscriber for that message type on that mbox then an event is created. An event is info about a specific message which must be delivered to the specific agent. Event is stored in the events queue.
An event must be extracted from the event queue and the message from that event must be passed to agent-receiver for processing. Agent-receiver processes message by event handler which is subscribed to this message type. An event handler is represented by agent's method or just lambda-function.
An event handler is called on the context of some thread of execution. This thread is provided by a dispatcher.
Every worker thread on the context of which an event handler is invoked belongs to some dispatcher. So a dispatcher can be seen as the manager of worker threads to be used for the invocation of event handlers. Creation and deletion of working threads is an area of responsibility of dispatcher.
There could be many dispatchers in SObjectizer Environment. Each of them will have its own set of working threads.
Every agent should be bound to some dispatcher. Binding is done during the registration of the agent's coop. A programmer can select a dispatcher for a specific agent. Once the agent is bound to a dispatcher all agent's events will be handled on worker threads of that dispatcher.
A dispatcher is not only a manager for working threads. It also manages event queues for agents which are bound to that dispatcher. It is a significant difference of SObjectizer from other actor frameworks: an event queue doesn't belong to an agent. A queue is created and managed by a dispatcher.
Another difference of SObjectizer from other actor framework is a number of event queues. In the "classical" actor-based approach every actor has its own queue with messages to be processed. In SObjectizer a dispatcher makes a decision how many event queues are necessary for serving agents bound to that dispatcher.
The simplest dispatcher type in SObjectizer, one_thread dispatcher, uses just one event queue for all agents bound to that dispatcher. Much more complex dispatchers like thread_pool and adv_thread_pool can create as many event queues as it required by a programmer. Because of that, every dispatcher has its own logic of handling event queues and dispatching events for processing by agents.
So now we can say that dispatcher is responsible for:
- creation and deletion of working threads;
- creation and deletion of event queues;
- extraction of events from event queues and invocation of event-handlers on the context of dispatcher's working threads.
Different dispatchers do these actions differently. So a user can choose the most appropriate dispatcher for a particular task.
For what purpose does SObjectizer support different kinds of dispatchers and allow to create several dispatchers in an application?
Why a simple approach with one thread pool for all agents is not used?
For what reasons a user should take care of dispatcher and bind agents to some dispatcher?
It is a direct consequence of SObjectizer's primary target: simplification of development of multithreading application. SObjectizer doesn't hide threads from a user. But simplifies their usage and provides a way for safe interaction between threads via asynchronous message passing.
A user may require dedicated threads for performing long and CPU-intensive calculations in the background. Or dedicated threads might be necessary for interaction with some hardware attached to your computer. Or dedicated threads might be needed to do blocking calls to some 3rd party library. Or thread pool can be necessary for efficient processing of some event stream.
So there can be different reasons to use threads for solving some real-world problems.
What we can do with that?
We can create and manage working threads manually. We can pass information between threads via some ad-hoc implementations of event queues or synchronize access to shared data via low-level mechanisms like semaphores, mutexes, condition variables and so on...
Or we can get ready-to-use tools for management of working threads and inter-thread message passing. SObjectizer provides such tools:
- working thread management and events scheduling is done via dispatchers;
- inter-thread message passing is implemented via general message delivery mechanism (mboxes, event subscriptions and so on).
And because different users require different thread management policies there are different dispatchers in SObjectizer. Because of that, there is a possibility to create any number of dispatchers in an application. All of that should allow to user to solve its domain-specific task by a most appropriate way.
But it doesn't mean that a user must take care of a dispatcher for every agent. A user can take care of it if he/she wants to. But if not, there are such things as default dispatcher and coop's main dispatcher binder which significantly simplify working with SObjectizer.
Every SObjectizer Environment has the default dispatcher. It is created and started automatically.
The default dispatcher is an instance of one_thread dispatcher. It means that all agents bound to that dispatcher will work on the same worker thread.
If a user doesn't specify a dispatcher for agent or agent's coop then the agent will be bound to the default dispatcher.
Dispatchers in SObjectizer-5.8 are accessible only via direct references obtained during dispatchers creation. Dispatchers are destroyed automatically when no one uses them.
An instance of a dispatcher is created by appropriate make_dispatcher()
which is implemented for every standard dispatcher type in SObjectizer.
A kind of smart pointer is returned by make_dispatcher()
function. This handle can be used for binding agents for that private dispatcher instance.
The instance will be destroyed automatically when no one uses this handle anymore.
An example of the creation of dispatchers:
// A new instance of one_thread dispatcher.
auto one_thread_disp = so_5::disp::one_thread::make_dispatcher(env);
// Another instance of one_thread dispatcher.
// Note that one_thread_disp and another_one_thread_disp point to different instances.
auto another_one_thread_disp = so_5::disp::one_thread::make_dispatcher(env);
// A new instance of active_obj dispatcher.
auto active_obj_disp = so_5::disp::active_obj::make_dispatcher(env);
// A new instance of thread_pool dispatcher.
auto tp_disp = so_5::disp::thread_pool::make_dispatcher(env, 16);
A binding of an agent to a dispatcher is done via a separate object called dispatcher binder.
Dispatcher binder knows how to bind agent to dispatcher instance during agent registration. And how unbind the agent during deregistration.
Every dispatcher implements its own dispatcher binder. It means that if a user wants to bind agent to active_obj dispatcher instance the user must create an active_obj dispatcher binder.
Binder can be specified for an agent during agent creation:
auto coop = env.make_coop();
// An agent will be bound to a new one_thread dispatcher.
// Creation of a dispatcher.
auto one_thread_disp = so_5::disp::one_thread::make_dispatcher(env);
coop->make_agent_with_binder< some_agent_type >(
// Creation of dispatcher binder.
one_thread_disp.binder(),
... /* Args for some_agent_type constructor */ );
// An agent will be bound to a new active_obj dispatcher.
// Creation of a dispatcher.
auto active_obj_disp = so_5::disp::active_obj::make_dispatcher(env);
coop->make_agent_with_binder< some_agent_type >(
// Creation of dispatcher binder.
active_obj_disp.binder(),
... );
Binder can also be specified for the whole coop. Such binder will be main binder for that coop:
auto coop = env.make_coop(
// A main binder for coop will be a binder to a new instance of active_obj.
so_5::disp::active_obj::make_dispatcher(env).binder() );
// This agent will be bound via coop's main binder to active_obj dispatcher.
coop->make_agent< some_agent_type >( ... );
// This agent will also be bound to the same active_obj dispatcher.
coop->make_agent< another_agent_type >( ... );
// But this agent will be bound to a new one_thread dispatcher. It is because
// a separate dispatcher binder is set for that agent.
coop->make_agent_with_binder< some_agent_type >(
so_5::disp::one_thread::make_dispatcher(env).binder(), ... );
If a dispatcher binder is not set for a coop then a binder for the default dispatcher will be used as the main binder for that coop.
All event handlers are treated as not thread safe by default. It means that if there are events E1 and E2 in event queue for some agent A then these events will be delivered to A one after another.
It is impossible that event handler for E1 is running at the same time as the event handler for E2 but on a different thread.
SObjectizer guarantees that one event handler completed its execution and only then the next event handler will be called.
Thread safety has the sense for dispatchers which use pools of threads. Such dispatchers can migrate an agent from one working thread to another. Because such migration can take place the SObjectizer's thread safety guarantee simplifies the development of agents: there is no need for synchronization of access to the agent's internals by using locks or something like that.
User can mark an event handler as thread safe. For example, if an event handler doesn't modify the state of the agent. Or if such modification is implemented via the usage of appropriate mechanisms like mutex or spinlock.
Most of standard SObjectizer's dispatchers will ignore that mark and will treat every event handler as thread unsafe. But there is adv_thread_pool dispatcher which can use that mark appropriately.
When an agent is bound to adv_thread_pool dispatcher and event handler H is marked as thread-safe then the dispatcher can schedule calls of H for different events on several worker threads at the same time. Moreover, if an agent has thread-safe event handler H and not thread safe event handler G then adv_thread_pool will guarantee that all event handlers H will finish their work before a single event handler G will be invoked.
At v.5.8.0 there is only one dispatcher which can distinguish between not thread safe and thread-safe event handlers. But more dispatchers with such abilities can be introduced in the future versions of SObjectizer.
There are two special events that SObjectizer sends to each agent:
-
evt_start. This event is sent when agent is registered inside SObjectizer Environment and has to start its work. The
so_5::agent_t::so_evt_start
is called for an agent as the result of evt_start event; -
evt_finish. This event is sent when agent is being deregistered from the SObjectizer and it's the last event the agent processes inside the SObjectizer Environment. The
so_5::agent_t::so_evt_finish
is called for an agent as the result of evt_finish event.
Both of these events are critically important. But there are some peculiarities related to the evt_finish event: if this event can't be stored into agent's event_queue then the agent (and agent's coop) can't be deregistered.
The main reason why evt_finish can't be scheduled for agent (stored in agent's event queue) is std::bad_alloc
exception when there is not enough dynamic memory. All standard dispatchers in SObjectizer use dynamic memory allocation in event queues, so potentially std::bad_alloc
can be thrown during evt_finish scheduling.
If std::bad_alloc
(or another exception) occurs during evt_finish scheduling, the normal work of SObjectizer can't continue. That is why the scheduling of evt_finish is done inside noexcept
methods in SObjectizer (for example, the so_5::event_queue_t::push_evt_finish()
method is noexcept
). However, most of the standard SObjectizer's dispatchers allocate memory in push_evt_finish
and don't handle std::bad_alloc
. If the exception is thrown in push_evt_finish
the whole application will be terminated.
For many SObjectizer use cases, this isn't a problem. But if a user wants to use SObjectizer in scenarios where the recovery from std::bad_alloc
is strictly necessary, then only two dispatchers can be used safely: nef_one_thread and nef_thread_pool. These two dispatchers also use dynamic memory allocation for memory queues, but they preallocate a block of memory to hold evt_finish for each agent bound to these dispatchers. This block is automatically used when evt_finish is being sent to an agent, so std::bad_alloc
is impossible for scheduling evt_finish.
NOTE. nef_ part in the dispatchers names means noexcept for evt_finish.
There are ten standard dispatchers in SObjectizer-5.8
Five of them do not support agent's priorities. These dispatchers think that all agents have the same priority:
- one_thread, nef_one_thread,
- active_obj,
- active_group,
- thread_pool, nef_thread_pool,
- adv_thread_pool.
Three dispatchers support agent's priorities. They schedule agents with respect to agent's priority:
- prio_one_thread::strictly_ordered,
- prio_one_thread::quoted_round_robin,
- prio_dedicated_threads::one_per_prio.
All stuff related to some dispatcher is defined in a specific namespace inside so_5::disp
. For example:
so_5::disp::active_obj
so_5::disp::thread_pool
so_5::disp::prio_one_thread::strictly_ordered
.
Every dispatcher-specific namespace has a similar set of functions and classes like:
make_dispatcher
queue_params_t
, disp_params_t
, bind_params_t
This makes usage of different dispatchers very similar: if you know how to create one_thread dispatcher you easily create active_obj or thread_pool dispatcher.
one_thread dispatcher is the oldest, simplest and, sometimes, most useful dispatcher. It creates just one worker thread and just one event queue. Events for all agents bound to that dispatcher are stored in a single event queue.
The single worker thread gets the next event from the queue and runs event handler form agent-receiver. Then gets the next event and so on.
It is possible to say that one_thread implements cooperative multitasking: if some agent hangs then all other agents bound to that dispatcher will hang too.
The nef_one_thread dispatcher is available since v.5.8.0. It's a very similar to the one_thread dispatcher, but preallocates memory blocks for holding evt_finish events for all agents bound to the nef_one_thread dispatcher. It allows to provide a noexcept guarantee for pushing an evt_finish event to the dispatcher's event queue.
Currently, the one_thread and nef_one_thread dispatcher use different implementations of the dispatcher's event queues, so they have slightly different performance (one_thread is usually a bit more performant).
An instance of nef_one_thread dispacher can be used as the default dispatcher for a SObjectizer Environment (when using the standard multi-threaded environment infrastructure is used). It's necessary to instruct the SObjectizer Environment to create a nef_one_thread dispatcher by setting default dispatcher params using a so_5::disp::nef_one_thread::disp_params_t
object:
so_5::launch( [](so_5::environment_t & env) {...},
[](so_5::environment_params_t & params) {
params.default_disp_params(
so_5::disp::nef_one_thread::disp_params_t{});
});
active_obj is another very old and simple dispatcher. active_obj dispatcher creates a separate worker thread and separate event queue for every agent bound to that dispatcher. It means that every agent will have its own worker thread and its own event queue.
active_obj dispatcher creates a new event queue and launches a new working thread for every agent bound to it. When an agent is deregistered the dispatcher stops the worker thread of this agent and deletes the corresponding event queue. It means that if there are no agents bound to active_obj dispatcher there are no working threads nor event queues.
active_obj dispatcher is useful if it is necessary to provide a separate working thread for some agent. For example, if an agent needs a dedicated context for long-running activities like interaction with hardware devices, performing blocking calls to 3rd party libraries or external systems and so on.
The similar effect can be achieved by using a private one_thread dispatcher. But sometimes usage of active_obj dispatcher requires fewer efforts...
auto coop = env.make_coop(
// A main binder for coop will be binder to an active_obj dispatcher.
// It means that all agents of the coop without explicitly specified
// binder will be bound to that active_obj dispatcher and become
// active objects with own working thread.
so_5::disp::active_obj::make_dispatcher(env)->binder() );
for( const auto & d : devices )
// Create a separate agent for every device.
coop->make_agent< device_manager >( d );
env.register_coop( std::move(coop) );
active_group is another very old and simple dispatcher. It creates a separate worker thread and separate event queue for named group of agents. It means that all agents from the same group will work on a common worker thread.
active_group dispatcher creates a new worker thread and a new event queue when the first agent of a new group is registered. active_group dispatcher stops group's worker thread and destroys the group's event queue when the last agent from the group is deregistered. It means that all resources for a group are deallocated when the group lost the last member.
If a new agent is added to a group which disappeared earlier a new group with the same name will be created. For example:
- agents A and B are being registered as part of group G1;
- new group G1 is created, new working thread is started for G1;
- agents A and B are deregistered;
- working thread for G1 is stopped, group G1 is deleted;
- agents C and D are being registered as part of group G1;
- new group G1 is created, new working thread is started for G1;
Every instance of active_group dispatcher has its own set of active groups. A group name must be unique only inside one dispatcher instance. It means that dispatcher instance ag1 can have group G1 and dispatcher instance ag2 can have group G1 too. These groups will be different. The lifetime of ag1.G1 and ag2.G1 will be independent of each other.
active_group dispatcher can be useful if it is necessary to bind a group of agents to the same working thread. For example, if these agents perform different stages of complex requests processing.
Let's imagine a processing pipeline of three stages: requests params validation, request processing and generation of results. Each of these states can take a long time so it is better to provide a separate context to every request. It can be done by using active_group dispatcher:
// Somewhere at the start of the application.
auto request_processor_disp = so_5::disp::active_group::make_dispatcher(env);
...
// Somewhere in the handling of a new request.
std::string request_id = generate_request_id( request );
// Create a new coop.
auto coop = env.make_coop(
// All agents of this coop will be part of active group in
// active_group dispatcher.
// request_id will be used as group name.
request_processor_disp.binder( request_id ) );
// Filling coop with agents.
coop->make_agent< request_checker >( request );
coop->make_agent< request_handler >( request );
coop->make_agent< response_generator >( request );
env.register_coop( std::move(coop) );
thread_pool dispatcher creates a pool with several worker threads. An agent bound to thread_pool dispatcher can handle its events on any of worker threads from that pool.
thread_pool dispatcher guarantees that at any given moment only single event handler of the agent might run thus using only one of the working threads. It is impossible to run event handlers e1 and e2 of agent A on a different thread at the same time.
Attention. This dispatcher doesn't guarantee that so_evt_start
and so_evt_finish
will be called on the same worker thread. It is possible that so_evt_start
is called on one thread, but so_evt_finish
on another. This fact should be taken into account if it's necessary to run so_evt_start
/so_evt_finish
on the same thread.
The most complicated moment in thread_pool dispatcher is event queues. There are two important things which need to be mentioned for the understanding of thread_pool dispatcher logic:
- type of FIFO for agent/coop;
- max_demands_at_once parameter.
Type of FIFO determines the relationship between event handlers of agents from one coop. Suppose there is a coop with agents A, B and C. Some agent D sends the following messages in exactly that order:
- M1 to A
- M2 to B
- M3 to A
- M4 to C
- M5 to B
If agents A, B and C use cooperation FIFO then all these agents will use one common event queue. A sequence of event handler calls will look like:
- A's event handler for M1
- B's event handler for M2
- A's event handler for M3
- C's event handler for M4
- B's event handler for M5
Exactly in that order. B's event handler for M2 will be called only when A's event handler for M1 finished its work.
Cooperation FIFO also means that agents with that FIFO type cannot work on different threads at the same time.
For example, it is possible that agent A will handle M1 on thread T1. Then agent B will handle M2 on thread T2. Then agent A will handle M3 on T3. But it is impossible that A will handle M1 on T1 and B will handle M2 on T2 at the same time.
If agents A, B and C use individual FIFO then all these agents will use different and independent event queues. It is hard to predict as a sequence of event handlers will look like. For example:
- A's event handler for M1 on thread T1
- B's event handler for M2 on thread T2
- C's event handler for M4 on thread T3
- B's event handler for M5 on thread T2
- A's event handler for M3 on thread T1
Individual FIFO also means that agents with that FIFO type can work on different threads in parallel.
For example, it is possible that agent A will handle M1 on thread T1. At the same time, agent B will handle M2 on thread T2. At the same time, agent C will handle M4 on thread T3.
There is another side of FIFO type: thread safety. If agents from a coop use cooperative FIFO then they do not need to synchronize access to some shared data.
Suppose several agents from one coop use common std::map object. Because these agents cannot work on different threads at the same time they can read and modify that object without any mutexes or something like that. But if agents use individual FIFO every shared data must be protected or completely avoided.
Parameter max_demands_at_once tells how many events from an event queue can be processed by working thread before switching to processing of another event queue.
This parameter is important because every working thread in thread_pool dispatcher works this way:
- gets the next non-empty event queue;
- handle at most max_demands_at_once events from it;
- gets the next non-empty event queue;
- ...
Suppose there are agents A, B and C from one coop with cooperation FIFO and events for messages M1, M2, ..., M5 in their event queue. If max_demands_at_once is 4 then the following scenario might happen:
- some working thread T1 gets this event queue and calls event handlers for messages M1, ..., M4;
- working thread T1 switches for handling different event queue. Event queue for A, B, C agents holds M5;
- some working thread Tn gets this event queue and calls event handler for M5;
- event queue for A, B, C agents becomes empty and Tn switches to another event queue.
If max_demands_at_once is 1 then a working thread will handle just one event from A, B, C agent queue. Then this queue can be handled by another working thread which will handle just one event. Then this queue can be handled by another working thread. And so on.
A more interesting situation can be if agents A, B and C use individual FIFO. In that case, there will be independent queues for these agents:
- a queue for A with M1 and M3;
- a queue for B with M2 and M5;
- a queue for C with M4.
If max_demands_at_once is greater than 1 then there could be the following scenario:
- thread T1 handles M1 and M3 for A;
- thread T2 handles M4 for C;
- thread T3 handles M2 and M5 for B.
But if there are just two working threads in the pool:
- thread T1 handles M1 and M3 for A;
- thread T2 handles M4 for C;
- thread T1 handles M2 and M5 for B.
Value of max_demands_at_once determines how often a working thread will switch from one event queue to another.
It can have a huge impact on application performance: small values of max_demands_at_once will lead to frequent queue switching and this will slow down event processing.
So large values of max_demands_at_once can speed up event processing if there is a dense flow of events.
Implementation of thread_pool dispatcher makes things yet more complex and interesting: every agent for a coop can have its own parameters for thread_pool dispatcher.
It means that agent A can have individual FIFO and max_demands_at_once=100, but agents B and C from the same coop will have cooperation FIFO and max_demands_at_once=10. In this case, two different event queues will be created: one for A and another for B and C.
This complex logic of thread_pool allows precise performance tuning for complex use cases which can be found in real-life problems. But in the simple cases the default parameters can be used (cooperation FIFO and max_demands_at_once=4). This significantly simplifies the usage of thread_pool dispatcher. Especially when several coops are bound to the same thread_pool dispatcher instance.
An example for thread_pool dispatcher:
void init( so_5::environment_t & env )
{
using namespace so_5::disp::thread_pool;
env.introduce_coop(
// Create an instance of thread_pool dispatcher with 3 working threads.
make_dispatcher( env, 3 ).binder(
// All agents will use individual FIFO.
// Parameter max_demands_at_once will have the default value.
bind_params_t{}.fifo( fifo_t::individual ) ),
[]( so_5::coop_t & c ) {
auto collector = c.make_agent< a_collector_t >();
auto performer = c.make_agent< a_performer_t >( collector->so_direct_mbox() );
collector->set_performer_mbox( performer->so_direct_mbox() );
c.make_agent< a_generator_t >( collector->so_direct_mbox() );
});
}
Until v.5.5.16 thread-pool dispatchers (thread_pool
and adv_thread_pool
) used very simple working scheme: when new message was stored in demand queue and there was some sleeping worker thread, this sleeping thread was awakened. This scheme works well if message processing is a long action. In that case the cost of wakening of sleeping thread is much lower than cost of message processing.
But there could be scenarios where message processing is very quick and cheep. In such cases it can be more efficient not to wake up a sleeping working thread but allow to hold a message in demand queue for some time. Lets see small example:
using namespace so_5;
// The first agent in message handling chain. Does parameters checking.
class params_checker final : public agent_t
{
const mbox_t m_next;
public :
params_checker(context_t ctx, mbox_t next) : agent_t{std::move(ctx)}, m_next{std::move(next)}
{
so_subscribe_self().event([this](const initial_request & r) {
if(valid_request(r))
// Send request to the next stage.
send<checked_request>(m_next, r);
});
...
}
...
};
// Next agent in message handling chain. Does some parameters transformation.
class params_transformer final : public agent_t
{
const mbox_t m_next;
public :
params_transformer(context_t ctx, mbox_t next) : agent_t{std::move(ctx)}, m_next{std::move(next)}
{
so_subscribe_self().event([this](const checked_request & r) {
send<transformed_request>(m_next, transform(r));
});
...
}
...
};
Suppose that params_checker
and params_transformer
are part of one coop and are bound to a thread_pool
dispatcher. When params_checker
sends checked_request
message to params_transformer
it might be better to keep this message in demand queue for some time and handle in on the same working thread after return from params_checkers
event handler.
Since v.5.5.16 there is such parameter as next_thread_wakeup_threshold
for SObjectizer's thread pool dispatchers.
This parameter holds a value of demand queue size. When actual demand queue size of dispatcher becomes greater than next_thread_wakeup_threshold
then a sleeping working thread will be woken up (if there is any sleeping thread for the dispatcher).
By default next_thread_wakeup_threshold
is 0, but this value can be changed. For example:
using namespace so_5;
using namespace so_5::disp::thread_pool;
environment_t & env = ...;
auto disp = make_dispatcher(
env,
"my-thread-pool",
disp_params_t{}
.thread_count( 16 )
.tune_queue_params(
[]( queue_traits::queue_params_t & qp ) {
// A sleeping thread will be woken up if there are 3 or more
// demands in the dispatcher's demands queue.
qp.next_thread_wakeup_threshold( 2 );
} )
);
The nef_one_thread dispatcher is available since v.5.8.0. The nef_thread_pool dispatcher is similar to the old thread_pool dispatcher, but has several important differences:
- nef_thread_pool dispatcher peallocates memory blocks for evt_finish events and provides a noexcept guarantee for sending evt_finish to an agent;
- nef_thread_pool creates a separate event queue for each agent. It means that all agents bound to a nef_thread_pool dispatcher work with individual FIFO.
There is also a max_demands_at_once parameter that plays the same role as per thread_pool dispatcher. By default the max_demands_at_once is 1, but it can be changed via bind_params_t
:
void init( so_5::environment_t & env )
{
using namespace so_5::disp::nef_thread_pool;
auto disp = make_dispatcher( env, 3 );
env.introduce_coop(
// Create an instance of nef_thread_pool dispatcher with 3 working threads.
disp.binder(
// Use a custom value for max_demands_at_once.
bind_params_t{}.max_demands_at_once( 4 ) ),
[&disp]( so_5::coop_t & c ) {
// These agents will use max_demands_at_once==4.
auto collector = c.make_agent< a_collector_t >();
auto performer = c.make_agent< a_performer_t >( collector->so_direct_mbox() );
...
// This agent will use max_demands_at_once==2.
auto supervisor = c.make_agent_with_binder< a_supervisor_t >(
disp.binder( bind_params_t{}.max_demands_at_once( 1 ) ),
... );
...
});
}
Attention. This dispatcher doesn't guarantee that so_evt_start
and so_evt_finish
will be called on the same worker thread. It is possible that so_evt_start
is called on one thread, but so_evt_finish
on another. This fact should be taken into account if it's necessary to run so_evt_start
/so_evt_finish
on the same thread.
NOTE. This dispatcher also supports next_thread_wakeup_threshold
as the thread_pool
dispatcher described above.
adv_thread_pool dispatcher is similar to thread_pool dispatcher but has two important differences:
- It allows invoking of several thread-safe event handlers for an agent at the same time on different threads.
- As a consequence of the previous point, there is no max_demands_at_once parameter.
Attention. This dispatcher doesn't guarantee that so_evt_start
and so_evt_finish
will be called on the same worker thread. It is possible that so_evt_start
is called on one thread, but so_evt_finish
on another. This fact should be taken into account if it's necessary to run so_evt_start
/so_evt_finish
on the same thread.
Every working thread in adv_thread_pool works this way:
- gets some non-empty event queue;
- gets the first event from that queue;
- checks thread safety for event handler for that event;
- checks a possibility of invocation of this event handler;
- if event handler can be invoked it is called;
- if event handler cannot be invoked then the event is returned to the event queue and thread switches to another non-empty event queue.
An event handler for an agent can be called if:
- it is not thread-safe and there are no other event handlers of that agent which are running now on one of the threads;
- or it is thread-safe and there is no any not thread safe event handler which is working now on some thread.
It means that only one non-thread-safe event handler can be called. And the next event handler can be called only after completion of the previous handler. Several thread-safe handlers can work at the same time.
There are two types of FIFO for adv_thread_pool: cooperation and individual. Just like in thread_pool dispatcher. But in adv_thread_pool FIFO type has an influence on checking for thread safety of event handler.
Suppose there are agents A and B from one coop and they use cooperation FIFO. Suppose also that event handlers A.e1 and B.e2 are thread-safe, but event handler A.e3 is not.
If there are events e1, e2, e3, e2, e1, e3 in the event queue for A and B agents then event handlers will be called in the following sequence: A.e1 on thread T1, B.e2 on thread T2. Only after completion of event handlers A.e1 and B.e2 event handler A.e3 will be called.
If agents A and B use individual FIFO and there are events sequence {e1, e3, e1} and {e2, e2} in two event queues then there could be the following scenario:
- A.e1 on T1, then A.e3 on T1, then A.e1 on T1;
- B.e2 on T2;
- B.e2 on T3.
adv_thread_pool can be useful for spreading thread-safe event processing on several working threads.
For example there could be cryptographer agent which performs make_signature, check_signature, encrypt_block and decrypt_block operations. These operations are thread safe because they don't change the state of cryptographer agent. Event handlers for these operations can be marked as thread-safe. This allows the handling of several cryptographic operations at the same time in parallel.
Subscription of thread-safe event handler:
class cryptographer : public so_5::agent_t
{
void on_make_signature( const make_signature & ) {...}
void on_check_signature( const check_signature & ) {...}
void on_encrypt_block( const encrypt_block & ) {...}
void on_decrypt_block( const decrypt_block & ) {...}
...
void so_define_agent() override {
so_subscribe_self()
.event( &cryptographer::on_make_signature, so_5::thread_safe )
.event( &cryptographer::on_check_signature, so_5::thread_safe )
.event( &cryptographer::on_encrypt_block, so_5::thread_safe )
.event( &cryptographer::on_decrypt_block, so_5::thread_safe )
...
}
}
An agent can have thread-safe and non-thread-safe event handlers.
For example. cryptographer agent can have non-thread-safe event handler for message reconfigure. In this case, adv_thread_pool dispatcher guarantees that all thread safe event handlers finish their work before an event handler for reconfigure message will be started.
An example of binding agent to adv_thread_pool dispatcher:
using namespace so_5::disp::adv_thread_pool;
env.introduce_coop(
// All agents of new coop will work on adv_thread_pool dispatcher.
// Every agent will use individual FIFO.
make_dispatcher( env, 4 ).binder( bind_params_t{}.fifo( fifo_t::individual ) ),
[]( so_5::coop_t & coop ) {
coop.make_agent< cryptographer >();
...
} );
NOTE. This dispatcher also supports next_thread_wakeup_threshold
as the thread_pool
dispatcher described above.
NOTE. This is just a brief overview of that dispatcher type. For more information see Agent's priorities section.
prio_one_thread::strictly_ordered dispatcher allows for events of high priority agents to block events of low priority agents. It means that events queue is always strictly ordered: events for agents with high priority are placed before events for agents with lower priority.
For example if event queue is {e1(a7), e2(a6), e3(a4), e4(a4)}, where a7 means an agent with priority p7, then events will be handled in exact that order. After handling e1 the queue will be {e2(a6), e3(a4), e4(a4)}. If e5 for a5 arrived then the queue will become {e2(a6), e5(a5), e3(a4), e4(a4)}.
It means that the chronological order of events will be preserved only for events of agents with the same priority.
This dispatcher could be useful if there is a necessity of handling some messages before other messages.
For example, there could be a stream of tasks represented by take_job messages. There also could be a special message for task processor’s reconfiguration: new_config message. It could have the sense to handle new_config as soon as possible. This can be done by two agents which are bound to single prio_one_thread::strictly_ordered dispatcher.
One agent will have priority p1 and will handle new_config message. The second agent will have priority p0 and will handle take_job.
Both agents will have common shared data (at least configuration parameters, maybe something else). Dispatcher prio_one_thread::strictly_ordered guarantees that new_config will be handled as soon as the processing of the previous message finished.
An example of binding agents to prio_one_thread::strictly_ordered dispatcher:
namespace prio_disp = so_5::disp::prio_one_thread::strictly_ordered;
env.introduce_coop(
// Dispatcher instance and binder for it.
prio_disp::make_dispatcher( env ).binder(),
[]( so_5::coop_t & coop ) {
// An agent with higher priority.
coop.make_agent< config_manager >( so_5::prio::p1 );
// An agent with lower priority.
coop.make_agent< job_performer >( so_5::prio::p0 );
} );
NOTE. This is just a brief overview of that dispatcher type. For more information see Agent's priorities section.
prio_one_thread::quoted_round_robin dispatcher works on round-robin principle. It allows specifying the maximum count of events to be processed consequently for the specified priority. After processing that count of events dispatcher switches to processing events of lower priority even if there are yet more events of higher priority to be processed.
Dispatcher handles no more than Q7 events of priority p7, then no more than Q6 events of priority p6, ..., then no more than Q0 events of priority p0. If an event of higher priority is arrived during handling a quote for lower priority no switching is performed.
For example. if dispatcher handles events of priority p5 and event of priority p7 is arrived the dispatcher will continue to handle events of p5, then events of p4 (if any), ..., then events of p0 (if any). And only then events of p7.
This working scheme means that agent’s priorities treated as agent’s weight. A programmer can set bigger quotes for more prioritized (more heavyweight) agents and that agents will receive more resources than less prioritized (less weighted) agents.
A very important detail: events of the same priority are handled in chronological order.
A dispatcher of that type can be useful, for example, if there are agents which handles clients of different types. Some clients are VIP clients and they should receive the first-class quality of service and there could be other clients with lower demands for service quality.
A high priority to agents for handling VIP-client requests can be used and a large quote for that priority can be set. All other agents will have lower priority and a smaller quote. As result, more requests from VIP-clients will be handled but there also will be the processing of requests from other clients.
An example of binding agents to prio_one_thread::quoted_round_robin dispatcher:
namespace prio_disp = so_5::disp::prio_one_thread::quoted_round_robin;
env.introduce_coop(
// Create dispatcher and define quotes for several priorities.
prio_disp::make_dispatcher( env,
// By default every priority will have quote for 20 events.
prio_disp::quotes_t{ 20 }
// Priority p7 will have different quote.
.set( so_5::prio::p7, 45 )
// Priority p6 will have different quote too.
.set( so_5::prio::p6, 35 ) )->binder(),
[]( so_5::coop_t & coop ) {
coop.make_agent< vip_client_processor >( so_5::prio::p7 );
coop.make_agent< ordinal_client_processor >( so_5::prio::p6 );
coop.make_agent< free_client_processor >( so_5::prio::p0 );
} );
NOTE. This is just a brief overview of that dispatcher type. For more information see Agent's priorities section.
prio_dedicated_threads::one_per_prio dispatcher creates a single dedicated thread for every priority. It means that events for agents with priority p7 will be handled on a different thread than events for agents with, for example, priority p6.
Events of the same priority are handled in chronological order.
Because of the fact that priority is assigned to an agent at its creation and cannot be changed later an agent inside prio_dedicated_threads::one_per_prio is bound to a particular working thread and won't be moved from that thread to any other thread.
This property of the dispatcher can be used for binding processing of different message types to different working threads: messages of type M1 can be processed by agent A1 with priority p1, messages of type M2 -- by agent A2 with priority p2 and so on...
Assigning different priorities for agents which handle different message type may have a sense if OS-specific API for changing thread priority is used. For example, an agent with priority p7 can set higher priority for its working thread in so_evt_start()
method than an agent with priority p6.
An example of binding agents to prio_dedicated_threads::one_per_prio dispatcher:
namespace prio_disp = so_5::disp::prio_dedicated_threads::one_per_prio;
env.introduce_coop(
// An instance of dispatcher and a binder for it.
prio_disp::make_dispatcher( env )->binder(),
[]( so_5::coop_t & coop ) {
coop.make_agent< m1_handler >( so_5::prio::p1 );
coop.make_agent< m2_handler >( so_5::prio::p2 );
...
} );
Event queues in a dispatcher require synchronization for protection of queues' data from access from different threads. Synchronization objects are necessary not only for data protection but also for notification of consumers about the appearance of new events (a customer sleeps when event queue is empty and must be awakened on arrival of a new event). Dispatchers use special queue lock object for such synchronization.
There are two types of queue locks:
- combined_lock which uses a combination of busy waiting with spinlocks and switching to
mutex
andcondition_variable
if waiting takes too long. For more information about combined_lock's working principles please see so-5.5.10 Factories for MPSC event queue locks, so-5.5.11 Factories for event queue locks and so-5.5.18 Global lock_factories sections; - simple_lock which uses just
mutex
andcondition_variable
.
Locking with combined_lock is efficient if an application is working under the heavy load. But not efficient on some specific load profiles.
Let's imagine an active agent which initiates several periodic messages. All actions are performed by that agent only on arrival of periodic messages. All other time the agent does nothing and its worker thread is sleeping on the empty event queue.
If an application uses just one such active agent the overhead cost of busy waiting is relatively small and could be ignored. But if there are several dozens of such agents the overhead cost could be relatively high: 3-4% of CPU usage even if the application does nothing and all worker threads just do busy waiting periodically. The situation could be more dramatic if there are several such application on the same server.
A dispatcher uses lock factory for creation of queue locks. There are two lock factories:
-
combined_lock_factory
which creates combined_lock; -
simple_lock_factory
which creates simple_lock.
New factories can be added in the future versions of SObjectizer.
combined_lock_factory
is used by default. If this locking scheme is not appropriate for your application it is possible to specify different locking factory (or to specify combined_lock_factory
with different busy waiting time).
To specify lock factory it is necessary to use disp_params_t
object and the corresponding make_dispatcher
functions for dispatcher creation.
There are also namespaces queue_traits
with definitions of lock factory functions and other queue-related stuff in dispatchers' namespaces (like so_5::disp::one_thread::queue_traits
or so_5::disp::prio_one_thread::strictly_ordered::queue_traits
). Technicaly speaking those queue_traits namespaces are just an alias for so_5::disp::mpsc_queue_traits
or so_5::disp::mpmc_queue_traits
namespace.
Because of that the preparation of disp_params_t
for a dispatcher look similar for different dispatcher types. For example that is for one_thread dispatcher:
using namespace so_5::disp::one_thread;
auto disp = make_dispatcher( env, "handler", disp_params_t{}.tune_queue_params(
[]( queue_traits::queue_params_t & queue_params ) {
queue_params.lock_factory( queue_traits::simple_lock_factory() );
} ) );
And this is for active_obj dispatcher:
using namespace so_5::disp::active_obj;
auto disp = make_dispatcher( env, "handlers", disp_params_t{}.tune_queue_params(
[]( queue_traits::disp_params_t & queue_params ) {
queue_params.lock_factory( queue_traits::simple_lock_factory() );
} ) );
And this is for thread_pool dispatcher:
using namespace so_5::disp::thread_pool;
auto disp = make_dispatcher( env, "db_operations",
disp_params_t{}
.thread_count(16)
.set_queue_params( queue_traits::queue_params_t{}
.lock_factory( queue_traits::simple_lock_factory() ) ) );
Please note that lock_factory can be specified only at the moment of the creation of a dispatcher. The lock_factory cannot be changed after the creation of a dispatcher.
Lock factory can be specified for the default dispatcher too. That dispatcher is created automatically by SObjectizer Environment. To specify lock factory for the default dispacher it is necessary to use environment_params_t
:
so_5::launch( []( so_5::rt::environment_t & env ) {...},
[]( so_5::rt::environment_params_t & env_params ) {
using namespace so_5::disp::one_thread;
env_params.default_disp_params( disp_params_t{}.tune_queue_params(
[]( queue_traits::queue_params_t & queue_params ) {
queue_params.lock_factory( queue_traits::simple_lock_factory() );
} ) );
} );
At mentioned above the combined_lock_factory is still used by default. Default waiting time for busy waiting is specified by default_combined_lock_waiting_time()
function. Since v.5.5.10 it is one millisecond. It is possible to set different waiting time by using combined_lock_factory(duration)
function:
using namespace so_5::disp::active_group;
auto disp = make_dispatcher( env, "handlers", disp_params_t{}.tune_queue_params(
[]( queue_traits::queue_params_t & queue_params ) {
// Set up combined_lock with 0.5 second busy waiting time.
queue_params.lock_factory( queue_traits::combined_lock_factory(
std::chrono::milliseconds(500) ) );
} ) );