-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.6 Tutorials Basics
SObjectizer is a framework for organizing the application's work via message passing between agents. There are agents who send and receive messages or/and signals to each other. The messages are delivered via mailboxes and there are message queues where the messages are stored before processing. Some messages could be delayed (e.g., will be delivered after some time), and some of which could be repeated (e.g., will be delivered periodically again and again).
Every agent is working on some execution context, but the user has no need to manage its working thread manually. All of these working threads are controlled by dispatchers. The user is responsible only for the selection of appropriate dispatchers and distribution of agents among them.
SObjectizer Environment (SOEnv for short) is a container which comprises all the stuff:
SObjectizer Environment is represented to the user as an interface so_5::environment_t
.
It is possible to create several instances of SObjectizer Environment inside one application, although the need for it arises very rarely:
In such a case, they will be totally independent instances of the environment, which means that agents in one instance will work independently from agents in the other.
The interface so_5::environment_t
is an abstract class. The specific so_environment_t
implementation is created inside the so_5::launch()
functions family. There are two ways to start SObjectizer Environment -- a call of so_5::launch()
function or usage of so_5::wrapped_env_t
class.
There are several overloads for so_5::launch()
function. Every so_5::launch()
function does the same sequence of steps:
- instantiates a specific implementation of
so_5::environment_t
; - calls the user-supplied initialization function;
- starts SObjectizer Environment;
- waits for SObjectizer Environment shutdown.
It means that launch()
blocks the caller thread until SObjectizer Environment finished its work.
In simple application launch()
is called only once in main()
function, and afterward the application finished when launch()
returns. If a user wants to do something else concurrently or to run another SObjectizer Environment instance he/she should create a dedicated thread on the context of which launch()
will be called.
The so_5::launch()
function has two forms. The first one receives just one argument -- it is a function object (or lambda) with initialization actions:
so_5::launch( []( so_5::environment_t & env ) {
... // Some application specific code which uses 'env'.
} );
The second one receives two arguments. They are functional objects. The first, like in the previous case, does the initialization actions. The second argument sets up SObjectizer Environment's parameters:
so_5::launch( []( so_5::environment_t & env ) {
... // Some application specific code which uses 'env'.
},
[]( so_5::environment_params_t & params ) {
... // Some environment tuning code which uses 'params'.
} );
There is another way of starting SObjectizer Environment. This way is useful if some form of classical GUI-messages processing loop must be present in an application:
int main()
{
so_5::wrapped_env_t sobj; // Empty SO Environment will be started here.
sobj.environment().introduce_coop( []( so_5::coop_t & coop ) {
... // Some SO Environment initialization code.
} );
... // Some application-specific initialization code.
// Classical message processing loop for Windows GUI application.
while(!GetMessage(...))
{
TranslateMessage(...);
DispatchMessage(...);
}
sobj.stop_then_join(); // Stopping SO Environment and wait for complete finish.
... // Some application-specific deinitialization code.
}
An instance of so_5::environment_t
is automatically created inside wrapped_env_t
. A reference to that instance can be obtained via wrapped_env_t::environment()
method.
An instance of SObjectizer Environment is automatically started in the constructor of wrapped_env_t
. There is no need to shut down that SOEnv explicitly: it will be done in the destructor. But sometimes SOEnv should be shut down manually like in the example above. It could be useful if there is a need to guarantee that all agents have stopped their work and all resources allocated by Environment are deallocated.
The constructor of wrapped_env_t
can accept the same arguments as so_5::launch()
functions: a SObjectizer's initialization lambda and SOEnv's parameters could be passed as constructor arguments:
int main()
{
so_5::wrapped_env_t env{
// Initialization lambda.
[]( so_5::environment_t & env ) {
... // Some application specific code which uses 'env'
},
// Parameters tuning lambda.
[]( so_5::environment_params_t & params ) {
... // Some application specific code which uses 'params'.
} };
// Classical Windows GUI
// Classical message processing loop for Windows GUI application.
while(!GetMessage(...))
{
TranslateMessage(...);
DispatchMessage(...);
}
// SObjectizer Environment will be automatically shut down here.
}
The interaction between agents is performed by sending messages. There are two types of messages:
- the ordinary message with some data inside;
- the special case of a message where there is no any data except the fact of message existence. Such kind of message is signal.
Every message/signal must have a unique type -- a dedicated C++ class/struct.
For signals, the corresponding C++ classes must be derived from so_5::signal_t
.
For ordinary messages there are two possibilities: arbitrary type T can be used as a message type or type of message must be derived from so_5::message_t
.
// This is a message type.
struct msg_convert : public so_5::message_t { /* some fields here */ };
// This is a message type.
struct mouse_position { int x, y; };
// This type is also can be used as message type.
enum class engine_control { turn_on, speed_up, slow_down, turn_off };
// This is a signal type.
struct msg_get_status final : public so_5::signal_t {};
// This is a signal type.
struct ping final : public so_5::signal_t {};
All ordinary messages must be sent as dynamically allocated objects, e.g. instances of corresponding C++ classes are created by the new operator and then deleted by delete operator.
The messages are sent by so_5::send()
functions:
so_5::send< msg_convert >(dest, 42); /* Value 42 will be passed to
the constructor of msg_convert */
so_5::send< mouse_position >(dest, 0, 145);
so_5::send< engine_control >(dest, engine_control::turn_on); /* Value turn_on will be sent
as a message of type engine_control. */
so_5::send< msg_get_status >(dest); // Sending of a signal.
so_5::send< ping >(dest); // Sending of a signal.
All necessary dynamically-allocated objects are created automatically inside send()
functions. See a separate section related to send functions for more details.
Note. SObjectizer distinguishes message/signal by their C++ type. It is necessary to define some different type for each message even if they have the same structure. For example, there are two different types of message:
struct msg_convert_to_long final : public so_5::message_t
{
int m_value;
};
struct msg_convert_to_string final : public so_5::message_t
{
int m_value;
};
But these are not:
struct msg_convert_int final : public so_5::message_t
{
int m_value;
};
// IT WON'T WORK IN SObjectizer-5!
typedef msg_convert_int msg_convert_to_long;
typedef msg_convert_int msg_convert_to_string;
Mbox is a shorthand for the message box. Mbox is a term which is used in SObjectizer for two abstractions:
- point/address/channel, to which messages and signals are sent;
- point/address/channel, from which messages and signals are received.
There are two different types of mboxes in SObjectizer, each of them is implemented in its own way. But there is an interface so_5::abstract_message_box_t
which is implemented by every type of mboxes. All interactions with mboxes are done via that interface.
Interface abstract_message_box_t
is an abstract class. So it is impossible to store objects of abstract_message_box_t
inside an agent or pass abstract_message_box_t
as an argument to a function/method or to a message. Only references/pointers to abstract_message_box_t
can be used. There is a special type so_5::mbox_t
which is an analog of smart pointer/reference to the abstract_message_box_t
. Objects of mbox_t are the only way to deal with mboxes in SObjectizer.
All mboxes are controlled by SObjectizer Environment. The interface environment_t
contains methods for creating mboxes. A user holds references to mboxes by so_5::mbox_t
objects. Every mbox will be automatically destroyed right after the moment when all references to it are gone.
SObjectizer uses a lightweight publish-subscribe model which is implemented by multi-producer/multi-consumer (MPMC) mboxes. The MPMC mbox is like a bulletin board: a producer publishes message/signal on it and all consumers of the message/signal type from the bulletin board receive the message/signal instance. It means that the producer should know the mbox, to which the messages will be sent (published). And the consumer also should know the mbox to make its message subscription. It is a task of a user to share that knowledge between application's agents.
Mbox can deliver messages of different types. It means that user can send various messages of various types to a single mbox. And, it is not necessary to create different mboxes for different message types.
However, the subscription is made for some specific message type. For example, if an agent A is subscribed to messages of type msg_ping
from mbox M, and agent B is subscribed to messages of type msg_pong
, then a user can send msg_ping
or msg_pong
messages to M, but A will receive only msg_ping
and B will receive only msg_pong.
There are two kinds of MPMC mboxes:
- anonymous. These mboxes have no identification. They are known only by mbox-references to them. In fact, if a user lost mbox-reference to an anonymous mbox he/she lost the mbox too;
- named. These mboxes should have unique names. SObjectizer Environment holds a dictionary of named mboxes. If a user creates a mbox with an already known name then the same mbox is returned.
An anonymous mbox can be created by environment_t::create_mbox()
method without the name argument:
// Anonymous mbox will be created.
auto mbox = env.create_mbox();
A named mbox can be created (or a reference to already created mbox is obtained) by environment_t::create_mbox()
method with the name argument:
// Named mbox is created or obtained.
auto mbox = env.create_mbox( "notification_channel" );
The choice between anonymous or named mboxes mainly depends on the possibility to pass mbox-reference to agent's constructor. If it is possible then anonymous mboxes could be a better choice. For example, if there are two agents which are constructed at the same time and should pass messages to each other then anonymous mboxes is the simplest case:
// Pinger agent. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::agent_t
{
const so_5::mbox_t m_self_mbox;
const so_5::mbox_t m_ponger_mbox;
public :
// Receives mbox-references in the constructor:
a_pinger_t( context_t ctx,
// Reference to own mbox.
so_5::mbox_t self_mbox,
// Reference to ponger's mbox.
so_5::mbox_t ponger_mbox )
: so_5::agent_t( ctx )
, m_self_mbox( std::move(self_mbox) )
, m_ponger_mbox( std::move(ponger_mbox) )
...
...
};
// Ponger agent. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::agent_t
{
const so_5::mbox_t m_self_mbox;
const so_5::mbox_t m_pinger_mbox;
public :
// Receives mbox-references in the constructor:
a_ponger_t( context_t ctx,
// Reference to own mbox.
so_5::mbox_t self_mbox,
// Reference to pinger's mbox.
so_5::mbox_t pinger_mbox )
: so_5::agent_t( ctx )
, m_self_mbox( std::move(self_mbox) )
, m_pinger_mbox( std::move(pinger_mbox) )
...
...
};
...
// Creation of mboxes.
auto pinger_mbox = env.create_mbox();
auto ponger_mbox = env.create_mbox();
// Creation of agents.
coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );
But there could be cases when agents are created in different parts of an application and direct exchange of mbox reference is difficult. In such cases named mboxes could help:
// Pinger agent. Should have its own mbox and mbox of ponger agent.
class a_pinger_t : public so_5::agent_t
{
const so_5::mbox_t m_self_mbox;
const so_5::mbox_t m_ponger_mbox;
public :
a_pinger_t( context_t ctx )
: so_5::agent_t( ctx )
, m_self_mbox( ctx.env().create_mbox( "pinger" ) )
, m_ponger_mbox( ctx.env().create_mbox( "ponger" ) )
...
...
};
// Ponger agent. Should have its own mbox and mbox of pinger agent.
class a_ponger_t : public so_5::rt::agent_t
{
const so_5::mbox_t m_self_mbox;
const so_5::mbox_t m_pinger_mbox;
public :
a_ponger_t( context_t ctx )
: so_5::agent_t( ctx )
, m_self_mbox( ctx.env().create_mbox( "ponger" ) )
, m_pinger_mbox( ctx.env().create_mbox( "pinger" ) )
...
...
};
...
// Pinger is created in one place...
coop->make_agent< a_pinger_t >();
...
// Ponger is created somewhere else...
coop->make_agent< a_ponger_t >();
There could be more sophisticated cases when agents are created, destroyed and created again, and in this case, the named mboxes could be useful. However, this is out of the scope of this introduction to SObjectizer's basics.
Family of so_5::send()
free functions is used for sending messages/signals to mboxes:
// Sending message.
so_5::send< msg_ping >( mbox,
// All the following arguments will be forwarded
// to the msg_ping's constructor.
... );
// Sending signal.
so_5::send< msg_pong >( mbox );
MPMC mboxes are good when 1-to-many or many-to-many agents interaction is required. But they are too expensive for the case of 1-to-1 interaction. It is because message passing via MPMC mbox consists of two phases:
- on the first phase there is a lookup for subscriber list for the specified message type;
- on the second phase there is a check for the possibility of handling this message type in the current subscriber’s state.
For the 1-to-1 agents' interaction, the first phase is redundant. The 1-to-1 interaction will be more efficient if a message is directly passed to the subscriber’s queue without looking up the subscriber list for the message type.
Because of that, there are multi-producer/single-consumer (MPSC) mboxes with direct and efficient support for 1-to-1 agents interaction.
These mboxes are automatically created by SObjectizer for every agent. When a constructor of agent_t
finishes its work there is an MPSC mbox for the agent. That mbox can be accessed via agent_t::so_direct_mbox()
method. That mbox belongs to the agent for which mbox is created. That agent is the owner of the MPSC mbox.
MPSC mbox implements abstract_message_box_t
interface just as MPMC mbox. References to MPSC mboxes can be stored by mbox_t
objects. References to MPSC mboxes in form of mbox_t
can be passed to other agents or can be stored anywhere the user wants.
The example with ping-ping agents from above could be rewritten with MPSC mboxes in the following way:
// Pinger agent.
class a_pinger_t : public so_5::agent_t
{
public :
a_pinger_t( context_t ctx )
: so_5::agent_t( ctx )
{}
// Reference to ponger's mbox cannot be passed to constructor
// so it must be set by this method.
void set_ponger_mbox( const so_5::mbox_t & mbox )
{
m_ponger_mbox = mbox;
}
...
};
// Ponger agent.
class a_ponger_t : public so_5::agent_t
{
public :
a_ponger_t( context_t ctx )
: so_5::agent_t( env )
{}
// Reference to pinger's mbox cannot be passed to constructor
// so it must be set by this method.
void set_pinger_mbox( const so_5::mbox_t & mbox )
{
m_pinger_mbox = mbox;
}
...
};
...
// Creation of agents.
auto pinger = coop->make_agent< a_pinger_t >();
auto ponger = coop->make_agent< a_ponger_t >();
// Exchange of mboxes.
a_pinger->set_ponger_mbox( a_ponger->so_direct_mbox() );
a_ponger->set_pinger_mbox( a_pinger->so_direct_mbox() );
The main differences between MPSC and MPMC mboxes are:
- the inability to subscribe to the messages from the mbox except for the owner of mbox;
- it is possible to send mutable messages into MPSC mboxes, but this is impossible for MPMC mboxes (more about mutability and immutability of messages below);
- delivery filters can be set only to MPMC mboxes;
Sending to agent's direct MPSC mbox is performed via so_5::send()
:
struct mouse_position { int x, y };
struct stop_engine : public so_5::signal_t {};
// Sending a message to agent.
// send() receives a reference to an agent and extracts the direct mbox by itself.
so_5::send< mouse_position >( *canvas, x_pos, y_pos );
// Sending a signal to agent.
so_5::send< stop_engine >( *engine_controller );
So, there are two types of mboxes with different abilities and with a noticeable difference in the effectiveness of message delivering to subscribes. Which one must be used and when?
MPMC mboxes must be used when an agent is a producer of some information which could be processed by several different agents. Or, when there are several producers for information and several consumers for that information.
Let's consider an agent which reads one socket, transform data and writes it to another socket. From time to time that agent sends a message about its current state: state of connections, amount of processed data and so on. If this message is sent to MPMC mbox it could be processed by different agents. One could transform that message to one or more SNMP data sources to be available for various monitoring tools like Zabbix or HP OpenView. Another receiver could use this status message for overloading control. Another receiver could write the content of this status message to a log-file. And so on.
MPSC mboxes must be used when an agent is a receiver of some agent-specific information and/or agent-specific commands. For example, the message about the need to reconfigure agent is agent-specific and it is better to send it to the agent’s MPSC mbox than to common MPMC mbox, which is shared between several agents.
Note. There is a significant trait of MPSC mbox in comparison with MPMC mbox: because there is the absence of the first phase (lookup for a list of subscribers by the message type) it is possible to push a message to agent’s queue even if the agent is not subscribed to it. This message will be rejected on the second phase when the checking for the ability of the message processing is performed.
The cooperation is a kind of container for introducing (registering) and removing (deregistering) agents to/from SObjectizer Environment. With the help of cooperation, some several agents which are dependent on one another could be registered at once in transaction mode: all of that agents are registered successfully or no one of them.
Every cooperation creation consists of three stages:
- A cooperation object must be created by
environment_t::make_coop()
method. - The cooperation must be filled up with agents by
coop_t::make_agent()
,coop_t::make_agent_with_binder()
orcoop_t::add_agent()
methods. - The cooperation must be registered by moving to
environment_t::register_coop()
method.
For example:
// Creation of cooperation.
auto coop = env.make_coop();
// Creation of agents.
coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );
// Registration of cooperation.
env.register_coop( std::move( coop ) );
There is also more compact way for creation and registration of a new coop:
env.introduce_coop( [&]( so_5::coop_t & coop ) {
coop->make_agent< a_pinger_t >( pinger_mbox, ponger_mbox );
coop->make_agent< a_ponger_t >( ponger_mbox, pinger_mbox );
} );
To deregister cooperation environment_t::deregister_coop()
method must be used. It receives the cooperation's ID to be deregistered and the reason for the deregistration:
... // Creation of coop with pinger and ponger agents.
auto ping_pong_coop_id = env.register_coop( std::move(coop) );
...
env.deregister_coop( ping_pong_coop_id, so_5::dereg_reason::normal );
Please note, the deregistration can take some time. The method deregister_coop()
only starts the deregistration procedure. But deregistration is completed only when all of the cooperation agents finished their message processing. Whereas the deregistration procedure is active, their agents cannot receive any new messages. The agents only handle their old messages which had been dispatched before deregister_coop()
call.
In the case where a cooperation should contain only one agent the register_agent_as_coop()
method can be used:
// Create agent and register it as a cooperation.
env.register_agent_as_coop(
// The single agent of cooperation.
env.make_agent< a_demo_t >( ... ) );
There is also optional parent-child relation between cooperations. It allows to specify some existent parent cooperation for a new cooperation before registration:
// A verbose version:
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
// New connection must be served by new cooperation.
auto coop = so_environment().make_coop(
// ID of the current coop is passed as ID of the parent.
// New cooperation will be a child of cooperation of the current agent.
so_coop() );
... // Filling new cooperation with agents.
so_environment().register_coop( std::move( coop ) );
}
// Or more compact one:
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
// New connection must be served by new cooperation.
// New cooperation will be a child of cooperation of the current agent.
auto coop = so_5::create_child_coop( *this );
... // Filling new cooperation with agents.
so_environment().register_coop( std::move( coop ) );
}
// Or yet more compact one:
void a_connection_manager_t::evt_new_connection( const msg_new_connection & evt )
{
// New connection must be served by new cooperation.
// New cooperation will be a child of cooperation of the current agent.
so_5::introduce_child_coop( *this,
[&]( so_5::coop_t & coop ) {
... // Filling new cooperation with agents.
} );
}
SObjectizer guarantees that children cooperation will be destroyed before its parent cooperation. It is useful for some reasons:
- no need to control the lifetime of child cooperation. It will be automatically deregistered when deregistration of the parent cooperation is started;
- it is safe to pass references to objects which are owned by parent cooperation to the child cooperation. For example, parent cooperation can create a connection to a database and holds a DB connection object as an attribute in some agent. Reference to that DB connection object can be safely passed to child cooperation because the lifetime of DB connection object will be longer than the lifetime of child cooperation.
Agents are objects which receive and handle messages and signals.
Every agent is linked with some message queue. When someone sends a message/signal to which an agent is subscribed a reference to that message/signal is stored in the agent's message queue. Agents periodically get messages from their queues and process them. Processing of message/signal from the queue is called event handling.
To define an agent it is necessary to derive a new class from so_5::agent_t:
class a_pinger_t : public so_5::agent_t
{ /* class definition here */ }
Subsequently, instances of that class may be used for some cooperation.
There are several virtual methods of so_5::agent_t
class which could be used for agent tuning.
The first one is so_define_agent()
. It is called inside register_coop()
before an agent starts its work. The main purpose of so_define_agent()
is to provide a possibility to make subscriptions. Usually agents make their subscription in so_define_agent()
:
void a_pinger_t::so_define_agent() override
{
so_subscribe_self().event( &a_pinger_t::evt_pong );
...
}
The second one is so_evt_start()
. It is called as the very first agent event after its successful registration of agent's cooperation. It could be used for doing something at the start of the agent's work:
void a_pinger_t::so_evt_start() override
{
so_5::send< msg_ping >( m_ponger_mbox, "Start!" );
}
The main difference between so_define_agent()
and so_evt_start()
is the following: so_define_agent()
is called on the context of thread where register_coop()
is called. Whereas so_evt_start()
is called on the context of agent's working thread (more about it see Dispatchers section below).
The next important virtual method is so_evt_finish()
. It is the opposite to so_evt_start()
. This method is the last method called for an agent on the context of the agent's working thread.
The pair of so_evt_start()
/so_evt_finish()
could be regarded as analogs of constructor/destructor. The method so_evt_start()
could be used for allocating some resource and so_evt_finish()
for deallocating it:
void a_db_manager_t::so_evt_start() override
{
// Create a connection to DB.
m_connection = create_database_connection( ... );
...
}
void a_db_manager_t::so_evt_finish() override
{
// Close the connection gracefully.
m_connection.close();
}
The usual way for message handling for ordinary agents is to define a dedicated non-static method for each message type. Method for message handling can have one of the following formats:
// Long forms.
// These forms can be used if MSG is a type of a signal.
void event_handler( so_5::mhood_t< MSG > evt );
void event_handler( const so_5::mhood_t< MSG > & evt );
// Short forms.
// These forms can be used only if MSG is a type of a message.
void event_handler( const MSG & evt );
// Short form with a copy of a message.
// Very useful for messages of lightweight types like int's or long's.
void event_handler( MSG evt );
There is no need to declare message type during event subscription -- it is deduced from the method's prototype:
void a_ponger_t::so_define_agent() override
{
so_subscribe_self().event( &a_ponger_t::evt_ping );
}
void a_ponger_t::evt_ping( const msg_ping & msg )
{ ... }
The difference between long and short forms is the possibility of calling mhood_t::make_reference()
method for resending the same message object. The long form could also be useful in templates where a type of message/signal to be processed is specified as a template's parameter:
template<typename Msg>
class my_template_agent : public so_5::agent_t
{
void on_event(mhood_t<Msg>) { ... }
...
void so_define_agent()
{
so_subscribe_self().event( &my_template_agent::on_event );
...
}
};
There is a possibility to use C++11 lambda functions as event handlers. They should be used as arguments to event()
method in subscription chain:
void a_watch_dog_t::so_define_agent() override
{
so_subscribe_self()
// Subscription to a signal.
.event( [this](mhood_t<shutdown>) { so_environment().stop(); } )
// Subscription to a message.
.event(
// Type of the message will be deduced.
[this]( const msg_failure_detected & msg ) {
m_everything_fine = false;
m_failure_description = msg.reason();
} );
}
Every agent has a priority. Priority is just an optional mark for dispatchers. Some dispatcher can use this mark for priority-specific events scheduling, some dispatcher can’t.
Priority is represented by enumeration so_5::priority_t
. There are just 8 priorities: from so_5::priority_t::p0
(the lowest) to so_5::priority_t::p7
(the highest). There also a useful constants functions in namespace so_5::prio
, for example so_5::prio::p1
as a synonym for so_5::priority_t::p1
.
Priority must be specified at the time of agent construction and priority of agent cannot be changed later. Priority is passed to the constructor of base class:
class my_agent : public so_5::agent_t
{
public :
my_agent( context_t ctx ) : so_5::agent_t( ctx + so_5::prio::p1 ), ... {}
...
};
All agents have the lowest priority by default. It means that if a priority for an agent is not set explicitly during construction the agent will have priority so_5::priority_t::p0
.
By default, SObjectizer thinks that all event handlers are not thread safe. Standard dispatchers guarantee that not_thread_safe event handlers will work alone and only on the context of one working thread.
Since v.5.4.0 it is possible to specify that an event handler is a thread safe handler. It means that the event handler doesn't change the agent state (or, does it in a thread-safe manner). If a user specifies an event handler as thread_safe handler it gives to SObjectizer's dispatcher a right of running thread-safe handlers of the same agent in parallel on the different working threads. However, with the following conditions:
- a not_thread_safe event handler can't be started until there is any other running event handler of the agent. The start of not_thread_safe event handler will be delayed until all already running event handlers finish their work;
- no one event handler can't be started until there is a working not_thread_safe event handler;
- no one thread_safe event handler can't be started until there is a working not_thread_safe event handler.
To specify thread_safe event handler it is necessary to pass the additional argument to event()
method in subscription chain:
void so_define_agent() override
{
st_disconnected
// thread_safety flag is not specified.
// This is not_thread_safe event handler and it
// can't work in parallel with other event handlers
// because it changes agent's state.
.event( &my_agent::evt_reconnect )
// This is thread_safe event and it doesn't change agent's state.
.event(
&my_agent::evt_send_when_disconnected,
so_5::thread_safe )
// This is thread_safe event and it doesn't change agent's state.
.event(
[this](mhood_t<msg_get_status>) { ... /* some actions */ },
so_5::thread_safe );
...
}
As an example of thread_safe and not_thread_safe event handler, let's see an agent for performing cryptography operations: encrypting and decrypting. These operations are stateless and can be done in parallel. But the agent must be reconfigured from time to time. The reconfiguration operation is stateful and event handler for reconfiguring message is not_thread_safe.
A user must only specify the thread_safety
flag for those event handlers. All of the other work will be done by SObjectizer. It means that evt_encrypt
and evt_decrypt
will be running in parallel until an occurrence of msg_reconfigure
. SObjectizer will wait until all previously started evt_encrypt
/evt_decrypt
finish their work and only then will start evt_reconfigure
. While evt_reconfigure
is working all of the other event handlers will be delayed and will be started only after return from evt_reconfigure
:
class a_cryptographer_t : public so_5::agent_t
{
public :
void so_define_agent() override
{
// not_thread_safe event handler.
so_default_state()
.event( &a_cryptographer_t::evt_reconfigure );
// thread_safe event handlers.
so_default_state()
.event( &a_cryptographer_t::evt_encrypt, so_5::thread_safe )
.event( &a_cryptographer_t::evt_decrypt, so_5::thread_safe );
}
void evt_reconfigure( const msg_reconfigure & evt ) { ... }
void evt_encrypt( const msg_encrypt & evt ) { ... }
void evt_decrypt( const msg_decrypt & evt ) { ... }
};
Every agent should work on some execution context: some worker thread must be assigned to an agent (or, in SObjectizer's terms, an agent must be bound to some worker thread). The special object is responsible for that -- dispatcher.
The dispatchers could be seen as managers for worker threads. SObjectizer's user is free from thread management. All threads are started and stopped by dispatchers under SObjectizer Environment control. All that the user should do is to specify which dispatcher should be used by SObjectizer Environment and bind agents to appropriate dispatchers.
There are several dispatcher types which are available for the user "out of the box." Five of them do not support the agent's priority and ignore priority value during the event schedule:
- the simplest one, which binds every agent to the same worker thread. It is called
one_thread
dispatcher; - the dispatcher, which binds an agent to a dedicated worker thread. The agent becomes an active object -- it works on its own thread and doesn't share this thread with the other agents. That dispatcher is called
active_obj
dispatcher; - the dispatcher, which binds an agent group to one dedicated working thread. Every agent within the group works on the same working thread, but the other agents work on different threads. That dispatcher is called
active_group
dispatcher; - the dispatcher, which uses a pool of working threads and distributes event handlers between them. It is called
thread_pool
dispatcher. This dispatcher doesn't distinguish between not_thread_safe and thread_safe event handlers and assumes that all event handlers are not_thread_safe. It means that agent work only on one working thread and no more than one event at a time; - the dispatcher, which uses a pool of working threads and distinguish between not_thread_safe and thread_safe event handlers. It is called
adv_thread_pool
dispatcher. This dispatcher allows running several of thread_safe event handlers of one agent on different threads in parallel.
There are also several dispatchers which support agent's priority and do priority-respected event scheduling:
- dispatcher
prio_one_thread::strictly_ordered
runs all events on the context of one working thread. This dispatcher allows for events of high priority agents to block events of low priority agents. It means that events queue is always strictly ordered: events for agents with high priority are placed before events for agents with lower priority; - dispatcher
prio_one_thread::quoted_round_robin
also runs all events on the context of one working thread. Dispatcherprio_one_thread::quoted_round_robin
works on round-robin principle. It allows specifying the maximum count of events to be processed consequently for the specified priority. After processing that count of events dispatcher switches to processing events of lower priority even if there are yet more events of higher priority to be processed; - dispatcher
prio_dedicated_threads::one_per_prio
creates a dedicated thread for every priority (so the eight working threads will be created). It means that events for agents with priority p7 will be handled on a different thread than events for agents with, for example, priority p6.
The one_thread
dispatcher is available to a user as a default dispatcher. There is no need to do something to launch that kind of dispatcher. By default, all agents are handled by that default dispatcher.
The other types of dispatchers require some user actions. Every type of dispatchers in SObjectizer provides functions make_dispatcher
which are used for the creation of a dispatcher instance. A smart reference to the newly created dispatcher is returned. This reference should be used for binding agents to that dispatcher:
void create_processing_coops( so_5::environment_t & env )
{
std::size_t capacities[] = { 25, 35, 40, 15, 20 };
// Private dispatcher for receivers.
auto receiver_disp = so_5::disp::thread_pool::make_dispatcher( env, 2 );
// And private dispatcher for processors.
auto processor_disp = so_5::disp::active_obj::make_dispatcher( env );
int i = 0;
for( auto c : capacities )
{
auto coop = env.make_coop();
auto receiver = coop->make_agent_with_binder< a_receiver_t >(
// Getting binder for the dispatcher and using that binder
// for new agent.
receiver_disp.binder( so_5::disp::thread_pool::bind_params_t{} ),
// These parameters are going to the agent's constructor.
"r" + std::to_string(i), c );
const auto receiver_mbox = receiver->so_direct_mbox();
coop->make_agent_with_binder< a_processor_t >(
// Getting binder for the dispatcher and using that binder
// for new agent.
processor_disp.binder(),
// These parameters are going to the agent's constructor.
"p" + std::to_string(i), receiver_mbox );
env.register_coop( std::move( coop ) );
++i;
}
}
Often it is necessary to send a message/signal which must be delayed in some time. It can be done by using so_5::send_delayed
function. For example:
void a_transaction_handler_t::evt_initiate_request( const msg_request_data & msg )
{
// Store the request info for further actions...
save_request_info( msg );
// ...initiate a request to remove site...
send_request( msg );
// ...and wait for some time.
so_5::send_delayed< msg_request_timedout >(
m_self_mbox,
// Message delayed for 500 millisecons.
std::chrono::milliseconds( 500 ),
// Will be passed to msg_request_timedout's constructor.
msg );
}
void a_transaction_handler_t::evt_timeout( const msg_request_timedout & msg )
{
// Some actions dealing with absence of response from remote site...
}
Sometimes it is necessary to resend some message/signal instance again and again. It can be done by using so_5::send_periodic
function:
auto timer_id = so_5::send_periodic< watch_dog >(
m_target_mbox,
// Delay for the first occurrence.
std::chrono::milliseconds{ 250 },
// The repetition period.
std::chrono::milliseconds{ 750 },
... /* all other parameters are going to watch_dog's constructor */ );
Note that the return value of so_5::send_periodic
should be stored. The returned value is timer ID. This ID can be used for canceling repetition of the periodic message:
auto timer_id = so_5::send_periodic< some_message >(...);
...
// Timer is no more needed.
timer_id.release();
The repetition of a periodic message will be automatically cancelled when the corresponding timer ID is destroyed.
Agents are often used to implement some kinds of finite automata. Based on that, SObjectizer has a facility to define states of agents explicitly. State restricts the set of messages which could be handled by an agent in that state.
To define a state for an agent an object of type so_5::state_t
must be declared and initialized as an attribute of the agent:
class a_transaction_handler_t : public so_5::rt::agent_t
{
// Agent is waiting for an initial request.
so_5::state_t st_wait_for_request{ this, "waiting for request" };
// Agent is waiting for a response from remote state.
so_5::state_t st_wait_for_remove_site_response{ this, "waiting for response" };
// Agent doesn't receive any response.
so_5::state_t st_response_timedout{ this, "no response received" };
// The response is received and agent is handing it.
so_5::state_t st_response_processing{ this, "processing" };
...
};
Every agent in SObjectizer has the default state. It is defined by SObjectizer and can be accessed through agent_t::so_default_state()
method.
Every agent starts its work in the default state. Because of that if an agent needs to be started in another state the switch must be performed in so_define_agent()
method:
void a_transaction_handler_t::so_define_agent() override
{
so_change_state( st_wait_for_request );
/* Other initialization actions here... */
}
The so_subscribe().event()
method chain, an example of which is shown above, does a subscription of an event handler in the default state. If an agent has several states, it should specify a state in so_subscribe().event()
chain by in()
method:
void a_transaction_handler_t::so_define_agent()
{
so_change_state( st_wait_for_request );
// Message msg_request_data will be processed only in st_wait_for_request state.
so_subscribe( m_self_mbox ).in( st_wait_for_request )
.event( &a_transaction_handler_t::evt_request );
// Response will be processed only in st_wait_for_response state.
so_subscribe( m_self_mbox ).in( st_wait_for_response )
.event( &a_transaction_handler_t::evt_response );
// Timeout must be processed in two states.
so_subscribe( m_self_mbox )
.in( st_wait_for_request )
.in( st_wait_for_response )
.event( &a_transaction_handler_t::evt_timeout );
}
There also is a more concise way to change the agent's state and to subscribe agent's events with respect to the agent's states. The example above can be rewritten this way:
void a_transaction_handler_t::so_define_agent()
{
this >>= st_wait_for_request; // Or: st_wait_for_request.activate();
// Message msg_request_data will be processed only in st_wait_for_request state.
st_wait_for_request.event( m_self_mbox, &a_transaction_handler_t::evt_request );
// Response will be processed only in st_wait_for_response state.
st_wait_for_response.event( m_self_mbox, &a_transaction_handler_t::evt_response );
// But if an event handler is used in different states then
// old subscription chain must be used.
so_subscribe( m_self_mbox )
.in( st_wait_for_request )
.in( st_wait_for_response )
.event( &a_transaction_handler_t::evt_timeout );
}
When a message/signal is sent to an mbox it will be stored in the agent's message queue regardless of the current agent state. The state of the agent is checked when a message/signal is extracted from the queue, and it's ready to be processed. SObjectizer checks the current agent state, and if the message is subscribed in the current state the message will be passed to the agent's event handler. Otherwise, the message is discarded and the next message is extracted from the queue.
To switch agent from one state to another agent_t::so_change_state()
method must be used. Or operator>>=
. Or state_t::activate()
method:
void a_transaction_handler_t::evt_request( const msg_request_data & msg )
{
so_change_state( st_wait_for_response );
...
}
void a_transaction_handler_t::evt_response( const msg_response_data & msg )
{
this >>= st_response_processing;
...
}
void a_transaction_handler_t::evt_timeout( const msg_timedout & msg )
{
st_response_timeout.activate();
...
}
Those methods do the same thing -- switch agent to the new state. The choice between so_change_state
, >>=
or activate
is just a matter of user's taste.
The main purpose of a message chain (or just mchain) mechanism is providing a way for interacting between SObjectizer- and non-SObjectizer-part of an application. The interaction in the opposite direction is very simple: the usual message passing via mboxes is used. But how to receive some messages from an agent back to non-SObjectizer-part of the application?
Message chain is the answer.
Message chain looks almost like mbox for agents. An agent can send messages to mchain exactly the same way as for mbox. So mchain can be passed to an agent, and the agent will use it as a destination for reply messages. On the other side of a mchain will be non-SObjectizer message handler. This handler will receive messages from the mchain by using special API functions and handle them appropriately.
In the following example, an agent of type data_reader
reads data samples from some source and passes them to a mchain as instances of type data_sample
. This mchain is being read outside of SObjectizer Environment by the help of so_5::receive
function:
class data_reader : public so_5::agent_t {
public:
data_reader(context_t ctx, so_5::mchain_t data_ch)
: so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
{}
...
private:
const so_5::mchain_t m_data_ch;
...
void read_data_event() {
auto data = read_next_data();
if(data_read())
// New data sample read successfully and can be delivered to the consumer.
so_5::send<data_sample>(m_data_ch, data);
else {
// Data source is closed. No more data can be received.
// Close the data chain and finish our work.
m_data_ch->close();
so_deregister_coop_normally();
}
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<data_reader>(
// Passing mchain as mchain to data_reader agent.
ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch).handle_all(), [](const data_sample & data) {...});
By default all messages are immutable. It makes possible 1-to-many delivery of a message to several subscribers at the same time. For example when the dest is a MPMC mbox then send<Msg>(dest,...)
delivers an instance of an immutable message of type Msg
to all subscribers (several event handlers for that instance can be run on parallel worker threads at the same time). Interaction using immutable messages is a very easy and powerful approach for building concurrent applications on top of publish-subscribe and Actor models: any messages can be redirected to any number of receivers or can be stored for the further processing.
However, sometimes interaction through immutable messages has some drawbacks. For example when agent A opens a file and wants to delegate file processing to agent B. Something like:
struct transfer_handle_file final : public so_5::message_t {
File m_file;
transfer_handle_file(File f) : m_file(stf::move(f)) {}
};
class A : public so_5::agent_t {
void on_handle_file_cmd(const handle_file & cmd) {
File f(cmd.file_name());
if(f)
// Open operation successed. File must be sent to agent B.
so_5::send<transfer_handle_file>(dest, std::move(f));
}
...
};
class B : public so_5::agent_t {
void on_transfer_handle_file(const transfer_handle_file & cmd) {
File f(std::move(cmd.m_file)); // WON'T COMPILE!!!
...
}
...
};
It won't work because cmd
is a const in B::on_transfer_handle_file
.
A straightforward but very dangerous solution: use of mutable
keyword:
struct transfer_handle_file final : public so_5::message_t {
mutable File m_file;
transfer_handle_file(File f) : m_file(std::move(f)) {}
};
It is dangerous because there is no guarantees that dest
is a MPSC mbox. If dest
is a MPMC mbox then the message with mutable m_file
can be received by several agents and this can lead to various errors.
There is a possibility to send a mutable message. The example above can be rewritten as such:
struct transfer_handle_file final : public so_5::message_t {
File m_file;
transfer_handle_file(File f) : m_file(stf::move(f)) {}
};
class A : public so_5::agent_t {
void on_handle_file_cmd(const handle_file & cmd) {
File f(cmd.file_name());
if(f)
// Open operation successed. File must be sent to agent B.
// Message transfer_handle_file will be sent as a MUTABLE message.
// `dest` must be a MPSC mbox, an exception will be thrown if not.
so_5::send<so_5::mutable_msg<transfer_handle_file>>(dest, std::move(f));
}
...
};
class B : public so_5::agent_t {
// Message transfer_handle_file will be received as MUTABLE message.
void on_transfer_handle_file(mutable_mhood_t<transfer_handle_file> cmd) {
File f(std::move(cmd->m_file)); // It's OK now.
...
}
...
};
Mutable messages can be sent to MPSC mboxes or mchains. But they can't be sent to MPMC mboxes (an exception will be thrown at such attempt).
Mutable messages can be sent as a delayed message. But they can't be sent as periodic messages.
Event-handlers for mutable messages should have one of the following formats:
void handler(so_5::mhood_t<so_5::mutable_msg<Msg>>);
void handler(so_5::mutable_mhood_t<Msgs>);
Note: mutable_mhood_t<M>
is just a shorthand for mhood_t<mutable_msg<M>>
.
Immutable message Msg
is considered to have a different type than mutable message mutable_msg<Msg>
. It allows to have different message handlers for Msg
and mutable_msg<Msg>
:
class demo : public so_5::agent_t {
virtual void so_define_agent() override {
so_subscribe_self().event(&demo::one).event(&demo::two);
}
virtual void so_evt_start() override {
so_5::send<Msg>(*this, ...);
so_5::send<so_5::mutable_msg<Msg>>(*this, ...);
}
void one(mhood_t<Msg>) { std::cout << "One!" << std::endl; }
void two(mutable_mhood_t<Msg>) { std::cout << "Two!" << std::endl; }
...
};