diff --git a/src/replication.c b/src/replication.c index b5ce77f5e0..bcad27110c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -704,7 +704,16 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors, &li); while ((ln = listNext(&li))) { client *monitor = ln->value; - addReply(monitor, cmdobj); + if (monitor->resp > 2) { + struct ClientFlags old_flags = monitor->flag; + monitor->flag.pushing = 1; + addReplyPushLen(monitor, 2); + addReply(monitor, shared.monitorbulk); + addReply(monitor, cmdobj); + if (!old_flags.pushing) monitor->flag.pushing = 0; + } else { + addReply(monitor, cmdobj); + } updateClientMemUsageAndBucket(monitor); } decrRefCount(cmdobj); diff --git a/src/server.c b/src/server.c index 1e38b5ac69..a3e8857d0c 100644 --- a/src/server.c +++ b/src/server.c @@ -2035,6 +2035,7 @@ void createSharedObjects(void) { shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17); shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19); shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14); + shared.monitorbulk = createStringObject("$7\r\nmonitor\r\n", 13); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n", 17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n", 19); @@ -6181,8 +6182,11 @@ void monitorCommand(client *c) { return; } - /* ignore MONITOR if already replica or in monitor mode */ - if (c->flag.replica) return; + /* Gently notify the client that the monitor command has already been issued. */ + if (c->flag.replica) { + addReplyError(c, "The connection is already in monitoring mode."); + return; + } c->flag.replica = 1; c->flag.monitor = 1; diff --git a/src/server.h b/src/server.h index 14a16593b0..df7296f957 100644 --- a/src/server.h +++ b/src/server.h @@ -1437,7 +1437,7 @@ struct sharedObjectsStruct { *xgroup, *xclaim, *script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire, *time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread, *lastid, *ping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, *redacted, *ssubscribebulk, *sunsubscribebulk, - *smessagebulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], + *smessagebulk, *monitorbulk, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ *bulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "$\r\n" */ *maphdr[OBJ_SHARED_BULKHDR_LEN], /* "%\r\n" */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index bafc46d4b7..483861517c 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -299,6 +299,60 @@ start_server {tags {"introspection"}} { set _ $res } {*"set" "foo"*"get" "foo"*} + test {MONITOR should support RESP3 protocol} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd monitor + $rd read ; # Consume the MONITOR reply + $rd readraw 1; + + r set foo bar + assert_equal ">2" [$rd read] + assert_equal "\$7" [$rd read] + assert_equal "monitor" [$rd read] + assert_match {*"set"*"foo"*"bar"*} [$rd read] + + $rd close + } + + test {multiple MONITOR commands should result in ERR} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd readraw 1; + + $rd monitor + assert_equal "+OK" [$rd read] + + $rd monitor + assert_equal "-ERR The connection is already in monitoring mode." [$rd read] + + $rd close + } + + test {MONITOR should came after PONG reply} { + set rd [valkey_deferring_client] + $rd HELLO 3 + $rd read ; # Consume the HELLO reply + + $rd monitor + $rd read ; # Consume the MONITOR reply + $rd readraw 1; + + $rd ping + + assert_equal "+PONG" [$rd read] + assert_equal ">2" [$rd read] + assert_equal "\$7" [$rd read] + assert_equal "monitor" [$rd read] + assert_match {*"ping"*} [$rd read] + + $rd close + } + test {MONITOR can log commands issued by the scripting engine} { set rd [valkey_deferring_client] $rd monitor