Skip to content

Commit

Permalink
Make fan pull from detector in multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
GDYendell committed Jul 11, 2024
1 parent c594177 commit 772e0e6
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 45 deletions.
23 changes: 16 additions & 7 deletions cpp/data/eigerfan/include/EigerFan.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@
#define RAPIDJSON_HAS_STDSTRING 1

#include <vector>
#include "EigerFanConfig.h"
#include "EigerDefinitions.h"
#include "zmq/zmq.hpp"

#include <log4cxx/logger.h>
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include <log4cxx/logger.h>
#include "rapidjson/writer.h"
#include "zmq/zmq.hpp"

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

#include "EigerFanConfig.h"
#include "EigerDefinitions.h"
#include "MultiPullBroker.h"


class EigerFan {

Expand All @@ -26,13 +32,14 @@ class EigerFan {
} EigerConsumer;

public:

EigerFan();
EigerFan(EigerFanConfig config_);
virtual ~EigerFan();
void run();
void HandleRxSocket(std::string& endpoint);
void Stop();
void SetNumberOfConsumers(int number);

protected:
void HandleStreamMessage(zmq::message_t &message, boost::shared_ptr<zmq::socket_t> socket);
void HandleGlobalHeaderMessage(boost::shared_ptr<zmq::socket_t> socket);
Expand All @@ -42,6 +49,7 @@ class EigerFan {
void HandleMonitorMessage(zmq::message_t &message, boost::shared_ptr<zmq::socket_t> socket, int rank);
void HandleForwardMonitorMessage(zmq::message_t &message, zmq::socket_t &socket);
void HandleControlMessage(zmq::message_t &message, zmq::message_t &idMessage);

void SendMessageToAllConsumers(zmq::message_t &message, int flags = 0);
void SendMessagesToAllConsumers(std::vector<zmq::message_t*> &messageLista);
void SendMessageToSingleConsumer(zmq::message_t &message, int flags = 0);
Expand All @@ -57,6 +65,8 @@ class EigerFan {
zmq::context_t ctx_;
zmq::socket_t controlSocket;
zmq::socket_t forwardSocket;
MultiPullBroker broker;
boost::shared_ptr<boost::thread> rx_thread_;
std::vector<EigerConsumer> consumers;

bool killRequested;
Expand All @@ -75,5 +85,4 @@ class EigerFan {
bool devShmCache;
};


#endif //EIGERDAQ_EIGERFAN_H
50 changes: 50 additions & 0 deletions cpp/data/eigerfan/include/MultiPullBroker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Created on: 18/08/2023
* Author: Gary Yendell
*/

#ifndef MULTIPULLBROKER_H
#define MULTIPULLBROKER_H


#include <atomic>
#include <vector>

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

#include <log4cxx/logger.h>
#include "zmq/zmq.hpp"

class MultiPullBroker {

public:
MultiPullBroker(
std::string& sink_endpoint,
int thread_count
);
~MultiPullBroker();

void connect(std::string& endpoint, void* inproc_context);
void start_message_counter();
uint64_t messages_received();
void shutdown();

protected:

private:
log4cxx::LoggerPtr logger_;

std::vector<boost::shared_ptr<boost::thread> > worker_threads_;
std::string source_endpoint_;
std::string sink_endpoint_;
zmq::context_t* inproc_context_;
int thread_count_;
std::atomic<std::uint64_t> messages_received_;
bool shutdown_requested_;

void worker_loop(std::string& endpoint);

};

#endif // MULTIPULLBROKER_H
104 changes: 66 additions & 38 deletions cpp/data/eigerfan/src/EigerFan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "EigerFan.h"

#include "EigerFan.h"

// Utility variables
int more;
size_t more_size = sizeof (more);
Expand Down Expand Up @@ -52,6 +54,8 @@ std::string GetStateString(EigerFanState state) {
return "UNKNOWN STATE";
}

static std::string BROKER_INPROC_ENDPOINT = "inproc://broker";

/**
* Create a string of value padded with zeroes.
*
Expand All @@ -71,7 +75,9 @@ std::string PadInt(int value) {
EigerFan::EigerFan()
: ctx_(EigerFanDefaults::DEFAULT_NUM_THREADS),
controlSocket(ctx_, ZMQ_ROUTER),
forwardSocket(ctx_, ZMQ_PUSH) {
forwardSocket(ctx_, ZMQ_PUSH),
broker(BROKER_INPROC_ENDPOINT, 1)
{
this->log = log4cxx::Logger::getLogger("ED.EigerFan");
LOG4CXX_INFO(log, "Creating EigerFan object from default options");
killRequested = false;
Expand All @@ -95,7 +101,9 @@ EigerFan::EigerFan()
EigerFan::EigerFan(EigerFanConfig config_)
: ctx_(config_.num_zmq_threads),
controlSocket(ctx_, ZMQ_ROUTER),
forwardSocket(ctx_, ZMQ_PUSH) {
forwardSocket(ctx_, ZMQ_PUSH),
broker(BROKER_INPROC_ENDPOINT, 8)
{
this->log = log4cxx::Logger::getLogger("ED.EigerFan");
config = config_;
LOG4CXX_INFO(log, "Creating EigerFan object from config options");
Expand Down Expand Up @@ -272,7 +280,7 @@ void EigerFan::run() {
LOG4CXX_INFO(log, std::string("Connecting to stream address at ").append(streamConnectionAddress));

// Initialise run poll set
zmq::pollitem_t runPollItems [config.num_consumers + 1 + 1 + config.num_zmq_sockets];
zmq::pollitem_t runPollItems [config.num_consumers + 1 + 1]; // consumers + control + forward
zmq_pollitem_t controlRunPollItem;
controlRunPollItem.socket = controlSocket;
controlRunPollItem.fd = 0;
Expand All @@ -297,32 +305,21 @@ void EigerFan::run() {
forwardinMonitorPollItem.revents = 0;
runPollItems[config.num_consumers + 1] = forwardinMonitorPollItem;

std::vector<boost::shared_ptr<zmq::socket_t> > streamSockets;
int streamSocketPollItemStartIndex = config.num_consumers + 1 + 1;
for (int i = 0; i < config.num_zmq_sockets; i++) {

boost::shared_ptr<zmq::socket_t> recvSocket(new zmq::socket_t(ctx_, ZMQ_PULL));
recvSocket->setsockopt (ZMQ_RCVHWM, &RECEIVE_HWM, sizeof (RECEIVE_HWM));
recvSocket->connect(streamConnectionAddress.c_str());
recvSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));

streamSockets.push_back(recvSocket);
// Spawn rx thread
LOG4CXX_INFO(log, "Spawning rx thread");
this->rx_thread_ = boost::shared_ptr<boost::thread>(
new boost::thread(boost::bind(&EigerFan::HandleRxSocket, this, streamConnectionAddress))
);

zmq_pollitem_t recvRunPollItem;
zmq::socket_t* streamSocket = recvSocket.get();
recvRunPollItem.socket = *streamSocket;
recvRunPollItem.fd = 0;
recvRunPollItem.events = ZMQ_POLLIN;
recvRunPollItem.revents = 0;
runPollItems[streamSocketPollItemStartIndex + i] = recvRunPollItem;
while (state != WAITING_STREAM) {
sleep(1);
}

state = WAITING_STREAM;

// Process tasks forever or until kill is requested
LOG4CXX_INFO(log, "Processing control tasks");
while (killRequested != true) {
zmq::message_t message;
zmq::poll (&runPollItems [0], config.num_consumers + 1 + 1 + config.num_zmq_sockets, -1);
zmq::poll(&runPollItems [0], config.num_consumers + 1 + 1, -1); // Monitor per consumer + control + forward

// Control socket events
if (runPollItems [0].revents & ZMQ_POLLIN) {
Expand All @@ -348,14 +345,6 @@ void EigerFan::run() {
forwardMonitorSocket.recv(&message);
HandleForwardMonitorMessage(message, forwardMonitorSocket);
}

// Stream socket events
for (int i = 0; i < config.num_zmq_sockets; i++) {
if (runPollItems [streamSocketPollItemStartIndex + i].revents & ZMQ_POLLIN) {
streamSockets[i]->recv(&message);
HandleStreamMessage(message, streamSockets[i]);
}
}
}

LOG4CXX_INFO(log, "Shutting down EigerFan sockets");
Expand All @@ -364,13 +353,43 @@ void EigerFan::run() {
consumers[i].sendSocket->close();
}

for (int i = 0; i < config.num_zmq_sockets; i++) {
streamSockets[i]->close();
forwardSocket.close();
controlSocket.close();
}

/**
* Connect broker to detector and handle the messages it produces
*/
void EigerFan::HandleRxSocket(std::string& endpoint) {
zmq::context_t inproc_context(8);
zmq::socket_t rx_socket(inproc_context, ZMQ_PULL);
rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM));
rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str());
int hwm = 100000;
rx_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
int linger = 100;
rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
this->broker.connect(endpoint, &inproc_context);

state = WAITING_STREAM;
LOG4CXX_INFO(log, "Processing rx socket");

zmq::message_t message;
zmq::pollitem_t pollItems [] = {{rx_socket, 0, ZMQ_POLLIN, 0}};
boost::shared_ptr<zmq::socket_t> socket_ptr(&rx_socket);
while (!killRequested) {
// Stream socket events
zmq::poll(&pollItems[0], 1, -1);
if (pollItems[0].revents & ZMQ_POLLIN) {
rx_socket.recv(&message);
HandleStreamMessage(message, socket_ptr);
}
}

forwardSocket.close();
rx_socket.close();
broker.shutdown();

controlSocket.close();
LOG4CXX_INFO(log, "RX thread done");
}

/**
Expand Down Expand Up @@ -405,6 +424,7 @@ void EigerFan::HandleStreamMessage(zmq::message_t &message, boost::shared_ptr<zm
currentOffset = configuredOffset;
configuredOffset = 0;
lastFrameSent = 0;
broker.start_message_counter();
num_frames_sent = 0;
for(int j=0; j<num_frames_consumed.size(); j++) {
num_frames_consumed[j] = 0;
Expand All @@ -428,8 +448,11 @@ void EigerFan::HandleStreamMessage(zmq::message_t &message, boost::shared_ptr<zm
LOG4CXX_WARN(log, "Error counting consumer frames for logging");
}
} else if (htype.compare(END_HEADER_TYPE) == 0) {
LOG4CXX_INFO(log, "End of series message received after " + boost::lexical_cast<std::string>(num_frames_sent) \
+ " frames sent");
LOG4CXX_INFO(
log,
"End of series message received after " + boost::lexical_cast<std::string>(broker.messages_received()) + \
" messages received and " + boost::lexical_cast<std::string>(num_frames_sent) + " frames sent."
);
std::string consumer_frames;
for(int j=0; j<num_frames_consumed.size(); j++) {
consumer_frames +=
Expand Down Expand Up @@ -948,6 +971,11 @@ void EigerFan::HandleControlMessage(zmq::message_t &message, zmq::message_t &idM
rapidjson::Value valueFrame(lastFrameSent);
document.AddMember(keyFrame, valueFrame, document.GetAllocator());

// Add Number of messages received
rapidjson::Value keyMessagesReceived("messages_received", document.GetAllocator());
rapidjson::Value valueMessagesReceived(broker.messages_received());
document.AddMember(keyMessagesReceived, valueMessagesReceived, document.GetAllocator());

// Add Number of Frames sent
rapidjson::Value keyFramesSent("frames_sent", document.GetAllocator());
rapidjson::Value valueFramesSent(num_frames_sent);
Expand Down Expand Up @@ -1304,7 +1332,7 @@ void EigerFan::SendMessagesToAllConsumers(std::vector<zmq::message_t*> &messageL
* \param[in] flags Any flags to apply to the message (e.g. more messages to come)
*/
void EigerFan::SendMessageToSingleConsumer(zmq::message_t& message, int flags) {
LOG4CXX_DEBUG(log, "Sending message to single consumers at index:" << currentConsumerIndexToSendTo);
LOG4CXX_DEBUG(log, "Sending message to single consumer at index:" << currentConsumerIndexToSendTo);

//Send the message to the forwarding stream
if (forwardStream && numConnectedForwardingSockets > 0)
Expand Down
Loading

0 comments on commit 772e0e6

Please sign in to comment.