Skip to content
This repository was archived by the owner on May 9, 2018. It is now read-only.

Commit e7dbbab

Browse files
committed
Fixes to heartbeats and NEB socket status logging
Switch to a 32-bit sequence number in heartbeats. Future versions of ZeroMQ get rid of zmq_event_t, and we're already doing the right thing, just wrapping it up in zmq_event_t. Get rid of this weird opaque structure and future proof the socket status logging code.
1 parent 46751f7 commit e7dbbab

File tree

5 files changed

+24
-21
lines changed

5 files changed

+24
-21
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
*.a
44
*.i
55
*.s
6+
*.tar.gz
67
.libs
78
.deps
9+
m4/*
810
Makefile
911
Makefile.in
1012
aclocal.m4

dnxmq/heartbeats.c

+6-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <time.h>
99
#include "mqexec.h"
1010

11-
int64_t last_sent_seq = -1, last_recv_seq = -1;
11+
int32_t last_sent_seq = -1, last_recv_seq = -1;
1212
int32_t heartbeat_interval = -1, heartbeat_curr_interval = -1;
1313
time_t last_heartbeat = 0;
1414
char heartbeat_reply_string[255];
@@ -55,7 +55,7 @@ void subscribe_heartbeat(void * sock) {
5555

5656
void process_heartbeat(json_t * inputmsg) {
5757
char * replyto;
58-
int64_t sequence;
58+
int32_t sequence;
5959
if(json_unpack(inputmsg, "{ s:s s:i }",
6060
"pong_target", &replyto,
6161
"sequence", &sequence) != 0) {
@@ -93,14 +93,15 @@ void heartbeat_timeout_cb(struct ev_loop * loop, ev_timer * t, int event) {
9393
}
9494
heartbeat_curr_interval = heartbeat_interval;
9595

96-
if(last_recv_seq == last_sent_seq -1) {
96+
if(last_recv_seq == last_sent_seq) {
9797
logit(DEBUG, "Heartbeat didn't time out! Resetting timer.");
9898
send_heartbeat(loop);
9999
return;
100100
}
101101

102102
if(last_recv_seq != -1) {
103-
logit(DEBUG, "We recieved a pong message, but it wasn't right. Retrying.");
103+
logit(DEBUG, "We recieved a pong message, but it wasn't right. Retrying. (%08x != %08x)",
104+
last_recv_seq, last_sent_seq, last_recv_seq);
104105
send_heartbeat(loop);
105106
return;
106107
}
@@ -162,7 +163,7 @@ void send_heartbeat(struct ev_loop * loop) {
162163

163164
output = json_pack("{s:s s:i s:s}",
164165
"type", "ping",
165-
"sequence", last_sent_seq++,
166+
"sequence", ++last_sent_seq,
166167
"replyto", heartbeat_reply_string
167168
);
168169
if(output == NULL) {

dnxmq/mqexec.c

+8-8
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ void recv_job_cb(struct ev_loop * loop, ev_io * i, int event) {
250250
void sock_monitor_cb(struct ev_loop * loop, ev_io * i, int event) {
251251
while(1) {
252252
void * sock = i->data;
253-
zmq_event_t sockevent;
253+
uint16_t event;
254+
int32_t value;
254255
zmq_msg_t addrmsg, eventmsg;
255256
int rc, shouldlog = 1;
256257

@@ -274,9 +275,8 @@ void sock_monitor_cb(struct ev_loop * loop, ev_io * i, int event) {
274275
}
275276

276277
const char* eventdata = (char*)zmq_msg_data(&eventmsg);
277-
memcpy(&(sockevent.event), eventdata, sizeof(sockevent.event));
278-
memcpy(&(sockevent.value), eventdata + sizeof(sockevent.event),
279-
sizeof(sockevent.value));
278+
memcpy(&event, eventdata, sizeof(event));
279+
memcpy(&value, eventdata + sizeof(event), sizeof(value));
280280
zmq_msg_close(&eventmsg);
281281

282282
zmq_msg_init(&addrmsg);
@@ -293,14 +293,14 @@ void sock_monitor_cb(struct ev_loop * loop, ev_io * i, int event) {
293293
}
294294
}
295295

296-
if(sockevent.event == 0) {
296+
if(event == 0) {
297297
zmq_close(sock);
298298
ev_io_stop(loop, i);
299299
return;
300300
}
301301

302302
// These are super chatting log messages, skip em.
303-
switch(sockevent.event) {
303+
switch(event) {
304304
case ZMQ_EVENT_CLOSED:
305305
case ZMQ_EVENT_CONNECT_DELAYED:
306306
shouldlog = 0;
@@ -313,7 +313,7 @@ void sock_monitor_cb(struct ev_loop * loop, ev_io * i, int event) {
313313
}
314314

315315
char * event_string;
316-
switch(sockevent.event) {
316+
switch(event) {
317317
case ZMQ_EVENT_CONNECTED:
318318
event_string = "Socket event on %.*s: connection established (fd: %d)";
319319
if(i == &pullmonio)
@@ -360,7 +360,7 @@ void sock_monitor_cb(struct ev_loop * loop, ev_io * i, int event) {
360360
}
361361

362362
logit(INFO, event_string, zmq_msg_size(&addrmsg),
363-
(char*)zmq_msg_data(&addrmsg), sockevent.value);
363+
(char*)zmq_msg_data(&addrmsg), value);
364364
zmq_msg_close(&addrmsg);
365365
}
366366
}

mods/nagmq_pull.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ extern int errno;
2626

2727
static void process_ping(json_t * payload) {
2828
char * target;
29-
int64_t seq;
29+
int32_t seq;
3030
char * extra = NULL;
3131
struct timeval curtime;
3232

mods/socketstatus.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ int pullmonfd = -1, reqmonfd = -1, pubmonfd;
1010
#if ZMQ_VERSION_MAJOR >= 3 && defined(HAVE_NAGIOS4)
1111
int sock_monitor_cb(int sd, int events, void * sock) {
1212
while(1) {
13-
zmq_event_t sockevent;
1413
zmq_msg_t addrmsg, eventmsg;
1514
int rc, shouldlog = 1;
15+
uint16_t event;
16+
int32_t value;
1617

1718
zmq_msg_init(&eventmsg);
1819
rc = zmq_msg_recv(&eventmsg, sock, ZMQ_DONTWAIT);
@@ -34,9 +35,8 @@ int sock_monitor_cb(int sd, int events, void * sock) {
3435
}
3536

3637
const char* eventdata = (char*)zmq_msg_data(&eventmsg);
37-
memcpy(&(sockevent.event), eventdata, sizeof(sockevent.event));
38-
memcpy(&(sockevent.value), eventdata + sizeof(sockevent.event),
39-
sizeof(sockevent.value));
38+
memcpy(&event, eventdata, sizeof(event));
39+
memcpy(&value, eventdata + sizeof(event), sizeof(value));
4040
zmq_msg_close(&eventmsg);
4141

4242
zmq_msg_init(&addrmsg);
@@ -54,7 +54,7 @@ int sock_monitor_cb(int sd, int events, void * sock) {
5454
}
5555

5656
// These are super chatting log messages, skip em.
57-
switch(sockevent.event) {
57+
switch(event) {
5858
case ZMQ_EVENT_CLOSED:
5959
case ZMQ_EVENT_CONNECT_DELAYED:
6060
shouldlog = 0;
@@ -67,7 +67,7 @@ int sock_monitor_cb(int sd, int events, void * sock) {
6767
}
6868

6969
char * event_string;
70-
switch(sockevent.event) {
70+
switch(event) {
7171
case ZMQ_EVENT_CONNECTED:
7272
event_string = "NagMQ socket event on %.*s: connection established (fd: %d)";
7373
break;
@@ -106,7 +106,7 @@ int sock_monitor_cb(int sd, int events, void * sock) {
106106
}
107107

108108
logit(NSLOG_INFO_MESSAGE, TRUE, event_string, zmq_msg_size(&addrmsg),
109-
(char*)zmq_msg_data(&addrmsg), sockevent.value);
109+
(char*)zmq_msg_data(&addrmsg), value);
110110
zmq_msg_close(&addrmsg);
111111
}
112112
return 0;

0 commit comments

Comments
 (0)