-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.8 ByExample Producer Consumer MChain
Note. This example is not simple one. It shows more complex usage of SObjectizer's features. For understanding of it a good knowledge of SO-5.8 Basics, SO-5.5 InDepth - Message Limits, and SO-5.8 InDepth - Message Chains is required.
This example shows one possible solutions for Producer-Consumer problem.
The solution shown below uses a feature introduced in v.5.5.14 -- mchain (for more information about mchain see SO-5.8 InDepth - Message Chains):
- consumer agent creates a mchain and receives requests from it. Mchain has limited capacity and suspends a producer on overflow. If there is no free room in the mchain after a timeout then an exception is thrown. This exception indicates that a request is not stored in the mchain;
- producers send requests as ordinary messages to that mchain. If mchain is full then producer will be blocked for some time. It reduces a requests generation rate for a producer.
There is one coop with several agents:
- just one consumer agent. This agent owns mchain for receiving requests from it. Consumer receives messages from that mchain and imitates some processing (by suspending working thread for randomly selected amount of time). This agent works on private one_thread dispatcher;
- 40 producer agents. These agents are working on thread_pool dispatcher (every agent has independent event queue). Every agent sends a requests to mchain, then receives reply message, then sends next requests and so on. Every producer must send 10 requests;
- logger agent for printing trace messages to std::cout. This agent works on private one_thread dispatcher;
- shutdowner agent which receives final notifications from producers and finishes the example. This agent works on the default dispatcher.
The full example's source code can be found in repository. Only most important aspects of the code will be discussed here.
Producer agent is implemented this way:
class producer final : public so_5::agent_t
{
// This signal allows to send next request for consumer.
struct send_next final : public so_5::signal_t {};
public :
producer( context_t ctx,
std::string name,
so_5::mbox_t logger_mbox,
so_5::mbox_t consumer_mbox,
unsigned int requests )
: so_5::agent_t{ ctx }
, m_name( std::move(name) )
, m_logger_mbox{ std::move(logger_mbox) }
, m_consumer_mbox{ std::move(consumer_mbox) }
, m_requests_left{ requests }
{
so_subscribe_self()
.event( &producer::evt_reply )
.event( &producer::evt_send_next );
}
void so_evt_start() override
{
// Initiate request sending loop.
so_5::send< send_next >( *this );
}
private :
const std::string m_name;
const so_5::mbox_t m_logger_mbox;
const so_5::mbox_t m_consumer_mbox;
unsigned int m_requests_left;
// An event for next attempt to send another requests.
void evt_send_next(mhood_t< send_next >)
{
if( m_requests_left )
{
// Send can wait on full mchain. Mark the start time to
// calculate send call duration later.
const auto started_at = steady_clock::now();
try
{
// Send another request.
// Note: this call can wait on full mchain.
so_5::send< request >( m_consumer_mbox,
so_direct_mbox(),
m_name + "_request_" + std::to_string( m_requests_left ) );
// How much time the send take?
const auto ms = std::chrono::duration_cast<
std::chrono::milliseconds >( steady_clock::now()
- started_at ).count();
m_logger_mbox <<= msg_maker() << m_name << ": request sent in "
<< ms << "ms";
}
catch( const so_5::exception_t & ex )
{
// Log the reason of send request failure.
m_logger_mbox <<= msg_maker() << m_name << ": request NOT SENT, "
<< ex.what();
// Initiate next send attempt.
so_5::send< send_next >( *this );
}
}
else
// No more requests to send. Shutdowner must known about it.
shutdowner::producer_finished( *this );
}
void evt_reply( const reply & msg )
{
m_logger_mbox <<= msg_maker() << m_name << ": reply received, "
<< msg.m_payload;
--m_requests_left;
so_5::send< send_next >( *this );
}
};
It is easy to see that producer agent doesn't use any mchains. It receives two mboxes in the constructor. One of them is mbox of logger agent. Another mbox will be used for sending requests to the consumer.
This is one of mchain's features: mchain can be seen just like ordinary mbox. And all well known send-functions can be used with such mbox. If mchain is represented by mbox then a sender doesn't know it operates with mchain.
Because of that producer uses so_5::send
function for sending a request to consumer.
But this call to of so_5::send
has some peculiarities:
- because underlying mchain has limited capacity a sender can be suspended for some time. To show that effect a producer shows time spent on
send
; - if mchain is full and there is no free room even after a timeout then an exception is thrown. This exception is handled and producers will try to send request next time.
Consumer agent is implemented this way:
class consumer final : public so_5::agent_t
{
// This signal will be sent by not_empty_notificator when
// the first message is stored to the empty mchain.
struct chain_has_requests final : public so_5::signal_t {};
public :
consumer( context_t ctx, so_5::mbox_t logger_mbox )
: so_5::agent_t{ ctx + limit_then_drop< chain_has_requests >(1) }
, m_logger_mbox{ std::move(logger_mbox) }
{
// Appropriate mchain must be created.
m_chain = so_environment().create_mchain(
so_5::make_limited_with_waiting_mchain_params(
// No more than 10 requests in the chain.
10,
// Preallocated storage for the chain.
so_5::mchain_props::memory_usage_t::preallocated,
// Throw an exception on overload.
so_5::mchain_props::overflow_reaction_t::throw_exception,
// Wait no more than 0.5s on overflow.
std::chrono::milliseconds(150) )
// A custom notificator must be used for chain.
.not_empty_notificator( [this] {
so_5::send< chain_has_requests >( *this );
} ) );
so_subscribe_self().event(
&consumer::process_requests );
}
// A mchain will look like an ordinary mbox from outside of consumer.
so_5::mbox_t consumer_mbox() const
{
return m_chain->as_mbox();
}
private :
const so_5::mbox_t m_logger_mbox;
so_5::mchain_t m_chain;
void process_requests(mhood_t< chain_has_requests >)
{
auto r = receive(
// Handle no more than 5 requests at once.
// No wait if queue is empty.
from( m_chain ).handle_n( 5 ).no_wait_on_empty(),
[]( const request & req ) {
// Imitation of some hardwork before sending a reply back.
std::this_thread::sleep_for( random_pause() );
so_5::send< reply >( req.m_who, req.m_payload + "#handled" );
} );
m_logger_mbox <<= msg_maker()
<< "=== " << r.handled() << " request(s) handled";
if( !m_chain->empty() )
// Not all messages from chain have been processed.
// Initiate new processing by sending the signal to itself.
so_5::send< chain_has_requests >( *this );
}
static std::chrono::milliseconds
random_pause()
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::chrono::milliseconds(
std::uniform_int_distribution< unsigned int >{2u, 25u}(gen) );
}
};
There are several interesting moments which must be discussed separatelly.
Special chain_has_requests signal is necessary for consumer agent for detection of moments when mchain is not empty. The usage of chain_has_requests is very simple:
- consumer agent creates a notificator for mchain which will be called automatically when the first message is stored into empty mchain;
- this notificator sends chain_has_requests signal to the consumer;
- consumer receives this signal and handles mchain's content in chain_has_requests's event handler.
There is just one trick. It is possible that several instances of chain_has_requests will be sent to consumer agent. But we need only first of them. To remove all redundant signals we define a message limit for chain_has_requests (for more information about message limits see SO-5.5 InDepth - Message Limits).
An instance of size-limited mchain with waiting on overflow is created in the constructor of consumer agent. This mchain has preallocated queue for 10 messages. Mchain will throw an exception if there is no free room even after of timeout of 0.15s:
m_chain = so_environment().create_mchain(
so_5::make_limited_with_waiting_mchain_params(
// No more than 10 requests in the chain.
10,
// Preallocated storage for the chain.
so_5::mchain_props::memory_usage_t::preallocated,
// Throw an exception on overload.
so_5::mchain_props::overflow_reaction_t::throw_exception,
// Wait no more than 0.5s on overflow.
std::chrono::milliseconds(150) )
// A custom notificator must be used for chain.
.not_empty_notificator( [this] {
so_5::send< chain_has_requests >( *this );
} ) );
Another important part of this code fragment is a setup of notificator which will be called when empty mchain becomes non-empty. This notificator sends chain_has_requests signal to consumer as described above.
Extraction of requests from mchain and handling of them is done inside invocation of so_5::receive
function with several constraints:
- no more than 5 requests can be extracted and handled during one call of
so_5::receive
; - if mchain is empty no waiting must be performed.
In the code it looks like:
auto r = receive(
// Handle no more than 5 requests at once.
// No wait if queue is empty.
from( m_chain ).handle_n( 5 ).no_wait_on_empty(),
[]( const request & req ) {
// Imitation of some hardwork before sending a reply back.
std::this_thread::sleep_for( random_pause() );
so_5::send< reply >( req.m_who, req.m_payload + "#handled" );
} );
Then the emptyness of mchain is checked. If mchain is not empty after return from so_5::receive
then next chain_has_requests signal is sent immediately. If mchain is empty then consumer agent will wait chain_has_requests signal from mchain's non-empty notificator.
Here is a fragment of example output:
producer-5: request sent in 0ms
producer-6: request sent in 0ms
producer-2: request sent in 0ms
producer-8: request sent in 0ms
producer-3: request sent in 0ms
producer-4: request sent in 0ms
producer-11: request sent in 0ms
producer-13: request sent in 0ms
producer-12: request sent in 0ms
producer-1: request sent in 0ms
producer-10: request sent in 0ms
producer-16: request sent in 21ms
producer-9: request sent in 40ms
producer-21: request sent in 11ms
producer-19: request sent in 53ms
=== 5 request(s) handled
producer-14: request sent in 83ms
producer-18: request sent in 4ms
producer-17: request sent in 16ms
producer-15: request sent in 25ms
producer-7: request sent in 131ms
=== 5 request(s) handled
producer-29: request sent in 27ms
producer-23: request sent in 99ms
producer-36: request sent in 31ms
producer-20: request sent in 88ms
producer-38: request sent in 22ms
=== 5 request(s) handled
producer-35: request sent in 48ms
producer-30: request sent in 94ms
producer-27: request sent in 40ms
producer-25: request NOT SENT, (.\so_5/rt/impl/h/mchain_details.hpp:589): error(164)
an attempt to push message to full mchain with overflow_reaction_t::throw_exception
policy
producer-22: request sent in 1ms
producer-37: request sent in 34ms
=== 5 request(s) handled
producer-28: request sent in 18ms
producer-32: request sent in 78ms
producer-34: request sent in 40ms
producer-26: request sent in 18ms
producer-40: request NOT SENT, (.\so_5/rt/impl/h/mchain_details.hpp:589): error(164)
an attempt to push message to full mchain with overflow_reaction_t::throw_exception
policy
producer-39: request sent in 0ms
producer-1: reply received, producer-1_request_10#handled
producer-5: reply received, producer-5_request_10#handled
producer-6: reply received, producer-6_request_10#handled
producer-2: reply received, producer-2_request_10#handled
producer-8: reply received, producer-8_request_10#handled
producer-3: reply received, producer-3_request_10#handled
producer-4: reply received, producer-4_request_10#handled
producer-10: reply received, producer-10_request_10#handled
producer-11: reply received, producer-11_request_10#handled
It can be seen that some requests were sent very quickly. Then producers encounter difficalties with sending requests to mchain and sent some time sleeping while consumer processed some older requests. Two producers receved exceptions because there was no free room in mchain even after waiting for 150ms.