Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hwm and linger defines #29

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/data/common/include/EigerDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ namespace Eiger {

// EigerFan related constants
const int MORE_MESSAGES = 1;
const int RECEIVE_HWM = 100000;
const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread
const int SEND_HWM = 100000;
const int WORKER_HWM = 10000; // A lower high water mark for the worker threads
const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds

const std::string CONTROL_CMD_KEY = "msg_val";
const std::string CONTROL_ID_KEY = "id";
Expand Down
16 changes: 6 additions & 10 deletions cpp/data/eigerfan/src/EigerFan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ EigerFan::~EigerFan() {
void EigerFan::run() {
LOG4CXX_INFO(log, "EigerFan::run()");
LOG4CXX_INFO(log, "Starting EigerFan");
int linger = 100; // Socket linger timeout in milliseconds

// Setup Control socket
std::string controlAddress("tcp://*:");
controlAddress.append(config.ctrl_channel_port);
LOG4CXX_INFO(log, std::string("Binding control address to ").append(controlAddress));
controlSocket.bind (controlAddress.c_str());
controlSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
controlSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Setup Fan Send Sockets
for (int i = 0; i < config.num_consumers; i++) {
Expand All @@ -152,7 +151,7 @@ void EigerFan::run() {
boost::shared_ptr<zmq::socket_t> sendSocket(new zmq::socket_t(ctx_, ZMQ_PUSH));
sendSocket->setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM));
sendSocket->bind(fanAddress.str().c_str());
sendSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
sendSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
EigerConsumer consumer;
consumer.connected = false;
consumer.sendSocket = sendSocket;
Expand All @@ -177,7 +176,7 @@ void EigerFan::run() {
}
boost::shared_ptr<zmq::socket_t> monitorSocket(new zmq::socket_t(ctx_, ZMQ_PAIR));
monitorSocket->connect(monitorAddress.str().c_str());
monitorSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
monitorSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));
monitorSockets.push_back(monitorSocket);
}

Expand All @@ -187,7 +186,7 @@ void EigerFan::run() {
LOG4CXX_INFO(log, std::string("Binding forwarding address to ").append(forwardAddress));
forwardSocket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM));
forwardSocket.bind(forwardAddress.c_str());
forwardSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
forwardSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Setup Forwarding Socket monitor
std::ostringstream forwardMonitorAddress;
Expand All @@ -200,7 +199,7 @@ void EigerFan::run() {
}
zmq::socket_t forwardMonitorSocket(ctx_, ZMQ_PAIR);
forwardMonitorSocket.connect(forwardMonitorAddress.str().c_str());
forwardMonitorSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
forwardMonitorSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT));

// Wait for configured number of consumers to connect
LOG4CXX_INFO(log, "Waiting for Consumers");
Expand Down Expand Up @@ -363,10 +362,7 @@ void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads
zmq::socket_t rx_socket(inproc_context, ZMQ_PULL);
rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM));
rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str());
int hwm = 100000;
rx_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
int linger = 100;
rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
rx_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
this->broker.connect(endpoint, &inproc_context);

state = WAITING_STREAM;
Expand Down
10 changes: 4 additions & 6 deletions cpp/data/eigerfan/src/MultiPullBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@ void MultiPullBroker::connect(std::string& endpoint, void* inproc_context) {
* \param[in] endpoint Endpoint of socket to pull data from
*/
void MultiPullBroker::worker_loop(std::string& endpoint) {
int hwm = 10000;
int linger = 100;

// Create source in new isolated context
// It is important to create a new context in each worker thread, as there are
// throughput limitations to a context shared between threads. Increasing ZMQ IO
// threads on the context is not sufficient.
zmq::context_t source_context(1);
zmq::socket_t source_socket(source_context, ZMQ_PULL);
source_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
source_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_HWM, sizeof(WORKER_HWM));
source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
source_socket.connect(endpoint.c_str());

// Create sink socket in context of main thread
// The sink sockets must use the context from the main thread to use the inproc://
// protocol. If it uses a different context the client will not see the messages.
zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH);
sink_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
sink_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_HWM, sizeof(WORKER_HWM));
sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
sink_socket.connect(this->sink_endpoint_.c_str());

// Initialise recv variables
Expand Down
Loading