Skip to content

Commit

Permalink
Introduce LINGER_TIMEOUT constant, replace local linger variables
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesOHeaDLS committed Sep 9, 2024
1 parent 6eb42d2 commit 1f1b749
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
1 change: 1 addition & 0 deletions cpp/data/common/include/EigerDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace Eiger {
const int MORE_MESSAGES = 1;
const int RECEIVE_HWM = 100000;
const int SEND_HWM = 100000;
const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds

const std::string CONTROL_CMD_KEY = "msg_val";
const std::string CONTROL_ID_KEY = "id";
Expand Down
14 changes: 6 additions & 8 deletions cpp/data/eigerfan/src/EigerFan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ EigerFan::~EigerFan() {
void EigerFan::run() {
LOG4CXX_INFO(log, "EigerFan::run()");
LOG4CXX_INFO(log, "Starting EigerFan");
int linger = 100; // Socket linger timeout in milliseconds

// Setup Control socket
std::string controlAddress("tcp://*:");
controlAddress.append(config.ctrl_channel_port);
LOG4CXX_INFO(log, std::string("Binding control address to ").append(controlAddress));
controlSocket.bind (controlAddress.c_str());
controlSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
controlSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Setup Fan Send Sockets
for (int i = 0; i < config.num_consumers; i++) {
Expand All @@ -152,7 +151,7 @@ void EigerFan::run() {
boost::shared_ptr<zmq::socket_t> sendSocket(new zmq::socket_t(ctx_, ZMQ_PUSH));
sendSocket->setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM));
sendSocket->bind(fanAddress.str().c_str());
sendSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
sendSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
EigerConsumer consumer;
consumer.connected = false;
consumer.sendSocket = sendSocket;
Expand All @@ -177,7 +176,7 @@ void EigerFan::run() {
}
boost::shared_ptr<zmq::socket_t> monitorSocket(new zmq::socket_t(ctx_, ZMQ_PAIR));
monitorSocket->connect(monitorAddress.str().c_str());
monitorSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
monitorSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
monitorSockets.push_back(monitorSocket);
}

Expand All @@ -187,7 +186,7 @@ void EigerFan::run() {
LOG4CXX_INFO(log, std::string("Binding forwarding address to ").append(forwardAddress));
forwardSocket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM));
forwardSocket.bind(forwardAddress.c_str());
forwardSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
forwardSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Setup Forwarding Socket monitor
std::ostringstream forwardMonitorAddress;
Expand All @@ -200,7 +199,7 @@ void EigerFan::run() {
}
zmq::socket_t forwardMonitorSocket(ctx_, ZMQ_PAIR);
forwardMonitorSocket.connect(forwardMonitorAddress.str().c_str());
forwardMonitorSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
forwardMonitorSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Wait for configured number of consumers to connect
LOG4CXX_INFO(log, "Waiting for Consumers");
Expand Down Expand Up @@ -363,8 +362,7 @@ void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads
zmq::socket_t rx_socket(inproc_context, ZMQ_PULL);
rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM));
rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str());
int linger = 100;
rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
rx_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
this->broker.connect(endpoint, &inproc_context);

state = WAITING_STREAM;
Expand Down
5 changes: 2 additions & 3 deletions cpp/data/eigerfan/src/MultiPullBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ void MultiPullBroker::connect(std::string& endpoint, void* inproc_context) {
* \param[in] endpoint Endpoint of socket to pull data from
*/
void MultiPullBroker::worker_loop(std::string& endpoint) {
int linger = 100;

// Create source in new isolated context
// It is important to create a new context in each worker thread, as there are
Expand All @@ -56,15 +55,15 @@ void MultiPullBroker::worker_loop(std::string& endpoint) {
zmq::context_t source_context(1);
zmq::socket_t source_socket(source_context, ZMQ_PULL);
source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM));
source_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
source_socket.connect(endpoint.c_str());

// Create sink socket in context of main thread
// The sink sockets must use the context from the main thread to use the inproc://
// protocol. If it uses a different context the client will not see the messages.
zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH);
sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM));
sink_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
sink_socket.connect(this->sink_endpoint_.c_str());

// Initialise recv variables
Expand Down

0 comments on commit 1f1b749

Please sign in to comment.