From c21bf9d571f9a108d981343b9a6b1ffe45b90f61 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Thu, 12 Dec 2024 21:56:58 +0000 Subject: [PATCH 01/15] Fixed issue with exiting subscribe mode Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 33 +++++++++++++++++++ tests/unit/pubsub.tcl | 75 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 4416e09431..24334e55e0 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -218,6 +218,10 @@ static struct config { int shutdown; int monitor_mode; int pubsub_mode; + int pubsub_channel_count; + int pubsub_pattern_count; + int pubsub_shard_count; + int pubsub_total_count; int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */ int latency_mode; int latency_dist_mode; @@ -2224,6 +2228,31 @@ static int cliReadReply(int output_raw_strings) { fflush(stdout); sdsfree(out); } + + /* Handle pubsub mode */ + if (config.pubsub_mode) { + if (isPubsubPush(reply)) { + if (reply->elements >= 3) { + char *cmd = reply->element[0]->str; + int count = reply->element[2]->integer; + + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0) { + config.pubsub_channel_count = count; + } else if (strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_pattern_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_shard_count = count; + } + config.pubsub_total_count = config.pubsub_channel_count + config.pubsub_pattern_count + config.pubsub_shard_count; + + if (config.pubsub_total_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } + } + } + } + return REDIS_OK; } @@ -9493,6 +9522,10 @@ int main(int argc, char **argv) { config.shutdown = 0; config.monitor_mode = 0; config.pubsub_mode = 0; + config.pubsub_channel_count = 0; + config.pubsub_pattern_count = 0; + config.pubsub_shard_count = 0; + config.pubsub_total_count = 0; config.blocking_state_aborted = 0; config.latency_mode = 0; config.latency_dist_mode = 0; diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 24b78b6e5a..eb3b802669 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -514,4 +514,79 @@ start_server {tags {"pubsub network"}} { assert_equal [r read] {message foo vaz} } {} {resp3} + test "SUBSCRIBE and UNSUBSCRIBE with multiple channels" { + # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. + set rd1 [valkey_deferring_client] + + assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] + assert_equal {chan1 1 chan2 1 chan3 1} [r pubsub numsub chan1 chan2 chan3] + assert_equal {2} [unsubscribe $rd1 {chan2}] + assert_equal {chan1 1 chan2 0 chan3 1} [r pubsub numsub chan1 chan2 chan3] + unsubscribe $rd1 + + set unsub1 [$rd1 read] + set unsub2 [$rd1 read] + + assert {[lindex $unsub1 0] eq "unsubscribe" && [lindex $unsub2 0] eq "unsubscribe"} + assert {([lindex $unsub1 1] eq "chan1" && [lindex $unsub2 1] eq "chan3") || + ([lindex $unsub1 1] eq "chan3" && [lindex $unsub2 1] eq "chan1")} + assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} + assert_equal {chan1 0 chan2 0 chan3 0} [r pubsub numsub chan1 chan2 chan3] + + $rd1 ping + assert_equal {PONG} [$rd1 read] + + $rd1 close + } + + test "PSUBSCRIBE and PUNSUBSCRIBE with multiple patterns" { + # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. + set rd1 [valkey_deferring_client] + + assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}] + assert_equal 3 [r pubsub numpat] + assert_equal {2} [punsubscribe $rd1 {chan2.*}] + assert_equal 2 [r pubsub numpat] + punsubscribe $rd1 + + set unsub1 [$rd1 read] + set unsub2 [$rd1 read] + + assert {[lindex $unsub1 0] eq "punsubscribe" && [lindex $unsub2 0] eq "punsubscribe"} + assert {([lindex $unsub1 1] eq "chan1.*" && [lindex $unsub2 1] eq "chan3.*") || + ([lindex $unsub1 1] eq "chan3.*" && [lindex $unsub2 1] eq "chan1.*")} + assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} + assert_equal 0 [r pubsub numpat] + + $rd1 ping + assert_equal {PONG} [$rd1 read] + + $rd1 close + } + + test "SSUBSCRIBE and SUNSUBSCRIBE with multiple shard channels" { + # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. + set rd1 [valkey_deferring_client] + + assert_equal {1 2 3} [ssubscribe $rd1 {schan1 schan2 schan3}] + assert_equal {schan1 1 schan2 1 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3] + assert_equal {2} [sunsubscribe $rd1 {schan2}] + assert_equal {schan1 1 schan2 0 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3] + sunsubscribe $rd1 + + set unsub1 [$rd1 read] + set unsub2 [$rd1 read] + + assert {[lindex $unsub1 0] eq "sunsubscribe" && [lindex $unsub2 0] eq "sunsubscribe"} + assert {([lindex $unsub1 1] eq "schan1" && [lindex $unsub2 1] eq "schan3") || + ([lindex $unsub1 1] eq "schan3" && [lindex $unsub2 1] eq "schan1")} + assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} + + assert_equal {schan1 0 schan2 0 schan3 0} [r pubsub shardnumsub schan1 schan2 schan3] + + $rd1 ping + assert_equal {PONG} [$rd1 read] + + $rd1 close + } } From cb717183ecb8c978b0ca84e29211dc96615b88d3 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 18 Dec 2024 21:32:47 +0000 Subject: [PATCH 02/15] fixed issue with the counters when exit subscribe mode Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 24334e55e0..0b8ad3e70a 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -218,10 +218,8 @@ static struct config { int shutdown; int monitor_mode; int pubsub_mode; - int pubsub_channel_count; - int pubsub_pattern_count; - int pubsub_shard_count; - int pubsub_total_count; + int pubsub_unsharded_count; /* channels and patterns */ + int pubsub_sharded_count; /* shard channels */ int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */ int latency_mode; int latency_dist_mode; @@ -2236,16 +2234,14 @@ static int cliReadReply(int output_raw_strings) { char *cmd = reply->element[0]->str; int count = reply->element[2]->integer; - if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0) { - config.pubsub_channel_count = count; - } else if (strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_pattern_count = count; + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || + strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_shard_count = count; + config.pubsub_sharded_count = count; } - config.pubsub_total_count = config.pubsub_channel_count + config.pubsub_pattern_count + config.pubsub_shard_count; - - if (config.pubsub_total_count == 0) { + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { config.pubsub_mode = 0; cliRefreshPrompt(); } @@ -9522,10 +9518,8 @@ int main(int argc, char **argv) { config.shutdown = 0; config.monitor_mode = 0; config.pubsub_mode = 0; - config.pubsub_channel_count = 0; - config.pubsub_pattern_count = 0; - config.pubsub_shard_count = 0; - config.pubsub_total_count = 0; + config.pubsub_unsharded_count = 0; + config.pubsub_sharded_count = 0; config.blocking_state_aborted = 0; config.latency_mode = 0; config.latency_dist_mode = 0; From 926a713c253717605ebf986f5b57e8ca8aa7ac96 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Thu, 19 Dec 2024 00:17:17 +0000 Subject: [PATCH 03/15] refactored code in valkey-cli.cand fixed the test cases Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 41 ++++++++--------- tests/integration/valkey-cli.tcl | 41 +++++++++++++++++ tests/unit/pubsub.tcl | 75 -------------------------------- 3 files changed, 60 insertions(+), 97 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 0b8ad3e70a..d51cc4e91d 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2227,28 +2227,6 @@ static int cliReadReply(int output_raw_strings) { sdsfree(out); } - /* Handle pubsub mode */ - if (config.pubsub_mode) { - if (isPubsubPush(reply)) { - if (reply->elements >= 3) { - char *cmd = reply->element[0]->str; - int count = reply->element[2]->integer; - - if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || - strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } - } - } - } - return REDIS_OK; } @@ -2427,6 +2405,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.pubsub_mode = 1; cliRefreshPrompt(); } + + /* Handle pubsub mode */ + if (config.last_reply->elements >= 3) { + char *cmd = config.last_reply->element[0]->str; + int count = config.last_reply->element[2]->integer; + + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || + strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } + } + if (--num_expected_pubsub_push > 0) { continue; /* We need more of these. */ } diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index 0c15af74f9..f81af5d9c6 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -608,6 +608,47 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES] } + test {valkey-cli pubsub mode with multiple subscription types} { + set fd [open_cli] + + # Subscribe to a regular channel + write_cli $fd "SUBSCRIBE channel1" + assert_match "*subscribe*channel1*" [read_cli $fd] + + # Subscribe to a pattern + write_cli $fd "PSUBSCRIBE pattern*" + assert_match "*psubscribe*pattern**" [read_cli $fd] + + # Subscribe to a shard channel + write_cli $fd "SSUBSCRIBE schannel1" + assert_match "*ssubscribe*schannel1*" [read_cli $fd] + + # Unsubscribe from regular channel + write_cli $fd "UNSUBSCRIBE channel1" + assert_match "*unsubscribe*channel1*" [read_cli $fd] + + # Verify still in pubsub mode + catch {run_command $fd "SET key value"} err + assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + + # Unsubscribe from patternvt + write_cli $fd "PUNSUBSCRIBE pattern*" + assert_match "*punsubscribe*pattern**" [read_cli $fd] + + # Verify still in pubsub mode + catch {run_command $fd "GET key"} err + assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + + # Unsubscribe from shard channel + write_cli $fd "SUNSUBSCRIBE schannel1" + assert_match "*sunsubscribe*schannel1*" [read_cli $fd] + + # Verify that we've exited pubsub mode + assert_equal "PONG" [run_command $fd "PING"] + + close_cli $fd + } + test "Valid Connection Scheme: redis://" { set cmdline [valkeycliuri "redis://" [srv host] [srv port]] assert_equal {PONG} [exec {*}$cmdline PING] diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index eb3b802669..24b78b6e5a 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -514,79 +514,4 @@ start_server {tags {"pubsub network"}} { assert_equal [r read] {message foo vaz} } {} {resp3} - test "SUBSCRIBE and UNSUBSCRIBE with multiple channels" { - # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. - set rd1 [valkey_deferring_client] - - assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] - assert_equal {chan1 1 chan2 1 chan3 1} [r pubsub numsub chan1 chan2 chan3] - assert_equal {2} [unsubscribe $rd1 {chan2}] - assert_equal {chan1 1 chan2 0 chan3 1} [r pubsub numsub chan1 chan2 chan3] - unsubscribe $rd1 - - set unsub1 [$rd1 read] - set unsub2 [$rd1 read] - - assert {[lindex $unsub1 0] eq "unsubscribe" && [lindex $unsub2 0] eq "unsubscribe"} - assert {([lindex $unsub1 1] eq "chan1" && [lindex $unsub2 1] eq "chan3") || - ([lindex $unsub1 1] eq "chan3" && [lindex $unsub2 1] eq "chan1")} - assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} - assert_equal {chan1 0 chan2 0 chan3 0} [r pubsub numsub chan1 chan2 chan3] - - $rd1 ping - assert_equal {PONG} [$rd1 read] - - $rd1 close - } - - test "PSUBSCRIBE and PUNSUBSCRIBE with multiple patterns" { - # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. - set rd1 [valkey_deferring_client] - - assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}] - assert_equal 3 [r pubsub numpat] - assert_equal {2} [punsubscribe $rd1 {chan2.*}] - assert_equal 2 [r pubsub numpat] - punsubscribe $rd1 - - set unsub1 [$rd1 read] - set unsub2 [$rd1 read] - - assert {[lindex $unsub1 0] eq "punsubscribe" && [lindex $unsub2 0] eq "punsubscribe"} - assert {([lindex $unsub1 1] eq "chan1.*" && [lindex $unsub2 1] eq "chan3.*") || - ([lindex $unsub1 1] eq "chan3.*" && [lindex $unsub2 1] eq "chan1.*")} - assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} - assert_equal 0 [r pubsub numpat] - - $rd1 ping - assert_equal {PONG} [$rd1 read] - - $rd1 close - } - - test "SSUBSCRIBE and SUNSUBSCRIBE with multiple shard channels" { - # Note: this is testing whether the client exits pubsub mode when subscribed to 0 channels. - set rd1 [valkey_deferring_client] - - assert_equal {1 2 3} [ssubscribe $rd1 {schan1 schan2 schan3}] - assert_equal {schan1 1 schan2 1 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3] - assert_equal {2} [sunsubscribe $rd1 {schan2}] - assert_equal {schan1 1 schan2 0 schan3 1} [r pubsub shardnumsub schan1 schan2 schan3] - sunsubscribe $rd1 - - set unsub1 [$rd1 read] - set unsub2 [$rd1 read] - - assert {[lindex $unsub1 0] eq "sunsubscribe" && [lindex $unsub2 0] eq "sunsubscribe"} - assert {([lindex $unsub1 1] eq "schan1" && [lindex $unsub2 1] eq "schan3") || - ([lindex $unsub1 1] eq "schan3" && [lindex $unsub2 1] eq "schan1")} - assert {[lindex $unsub1 2] == 1 && [lindex $unsub2 2] == 0} - - assert_equal {schan1 0 schan2 0 schan3 0} [r pubsub shardnumsub schan1 schan2 schan3] - - $rd1 ping - assert_equal {PONG} [$rd1 read] - - $rd1 close - } } From 2278fce24f39486f39ae7fbebb60077e737824cb Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Thu, 19 Dec 2024 00:20:57 +0000 Subject: [PATCH 04/15] fixed typo in the comments Signed-off-by: Nikhil Manglore --- tests/integration/valkey-cli.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index f81af5d9c6..d52df400e1 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -631,7 +631,7 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS catch {run_command $fd "SET key value"} err assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err - # Unsubscribe from patternvt + # Unsubscribe from pattern write_cli $fd "PUNSUBSCRIBE pattern*" assert_match "*punsubscribe*pattern**" [read_cli $fd] From 0c4a1773a3bc6a5a9159af75901e787613d6a769 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Thu, 19 Dec 2024 23:18:06 +0000 Subject: [PATCH 05/15] Refactored code again and added new tests to the test case Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 46 ++++++++++++++++++------------ tests/integration/valkey-cli.tcl | 48 ++++++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 30 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index d51cc4e91d..bf75d56aaf 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2398,6 +2398,34 @@ static int cliSendCommand(int argc, char **argv, long repeat) { fflush(stdout); if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { + + /* Handle pubsub mode */ + if (config.last_reply->elements >= 3) { + char *cmd = config.last_reply->element[0]->str; + int count = config.last_reply->element[2]->integer; + + if (strcmp(cmd, "subscribe") == 0) { + config.pubsub_unsharded_count++; + } else if (strcmp(cmd, "unsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "psubscribe") == 0) { + config.pubsub_unsharded_count++; + } else if (strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0) { + config.pubsub_sharded_count++; + } else if (strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } else { + config.pubsub_mode = 1; + } + } + if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the * [p|s][un]subscribe command. */ @@ -2406,24 +2434,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) { cliRefreshPrompt(); } - /* Handle pubsub mode */ - if (config.last_reply->elements >= 3) { - char *cmd = config.last_reply->element[0]->str; - int count = config.last_reply->element[2]->integer; - - if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || - strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } - } - if (--num_expected_pubsub_push > 0) { continue; /* We need more of these. */ } diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index d52df400e1..cd9fcece36 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -611,19 +611,25 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS test {valkey-cli pubsub mode with multiple subscription types} { set fd [open_cli] - # Subscribe to a regular channel - write_cli $fd "SUBSCRIBE channel1" - assert_match "*subscribe*channel1*" [read_cli $fd] - - # Subscribe to a pattern + write_cli $fd "SUBSCRIBE channel1 channel2 channel3" + set response [read_cli $fd] + + foreach line [split $response "\n"] { + if {[string match "*subscribe*channel1*" $line]} { + assert_match "*subscribe*channel1*" $line + } elseif {[string match "*subscribe*channel2*" $line]} { + assert_match "*subscribe*channel2*" $line + } elseif {[string match "*subscribe*channel3*" $line]} { + assert_match "*subscribe*channel3*" $line + } + } + write_cli $fd "PSUBSCRIBE pattern*" assert_match "*psubscribe*pattern**" [read_cli $fd] - # Subscribe to a shard channel write_cli $fd "SSUBSCRIBE schannel1" assert_match "*ssubscribe*schannel1*" [read_cli $fd] - # Unsubscribe from regular channel write_cli $fd "UNSUBSCRIBE channel1" assert_match "*unsubscribe*channel1*" [read_cli $fd] @@ -631,21 +637,39 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS catch {run_command $fd "SET key value"} err assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err - # Unsubscribe from pattern write_cli $fd "PUNSUBSCRIBE pattern*" assert_match "*punsubscribe*pattern**" [read_cli $fd] - # Verify still in pubsub mode + # Verify still in pubsub mode catch {run_command $fd "GET key"} err assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err - # Unsubscribe from shard channel write_cli $fd "SUNSUBSCRIBE schannel1" assert_match "*sunsubscribe*schannel1*" [read_cli $fd] - # Verify that we've exited pubsub mode - assert_equal "PONG" [run_command $fd "PING"] + catch {run_command $fd "INCR counter"} err + assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + + # Unsubscribe from all remaining channels + write_cli $fd "UNSUBSCRIBE" + set combined_response [read_cli $fd] + + foreach line [split $combined_response "\n"] { + if {[string match "*unsubscribe*channel2*" $line]} { + assert_match "*unsubscribe*channel2*" $line + } elseif {[string match "*unsubscribe*channel3*" $line]} { + assert_match "*unsubscribe*channel3*" $line + } + } + + # Verify that we've exited pubsub mode + set response [run_command $fd "PING"] + + if {[string first "(subscribed mode)" $response] >= 0} { + return -code error "Client is still in pubsub mode" + } + close_cli $fd } From 79aff57a170c716fb1b13b5cab1deeafc6ef37a9 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Fri, 3 Jan 2025 19:50:40 +0000 Subject: [PATCH 06/15] Fixed all issues with subscribe mode, refactored code, and finished implementing test cases Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 92 ++++++++++++++++++++++---------- tests/integration/valkey-cli.tcl | 92 +++++++++++++++----------------- 2 files changed, 107 insertions(+), 77 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index bf75d56aaf..566e12297d 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2226,7 +2226,7 @@ static int cliReadReply(int output_raw_strings) { fflush(stdout); sdsfree(out); } - + return REDIS_OK; } @@ -2247,7 +2247,32 @@ static void cliWaitForMessagesOrStdin(void) { sds out = cliFormatReply(reply, config.output, 0); fwrite(out, sdslen(out), 1, stdout); fflush(stdout); + + // Handle unsubscribe responses + if (isPubsubPush(reply) && reply->elements >= 3) { + char *cmd = reply->element[0]->str; + int count = reply->element[2]->integer; + + if (strcmp(cmd, "unsubscribe") == 0 || + strcmp(cmd, "punsubscribe") == 0 || + strcmp(cmd, "sunsubscribe") == 0) { + + if (strcmp(cmd, "unsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + break; + } + } + } + sdsfree(out); + freeReplyObject(reply); } } while (reply); @@ -2398,34 +2423,37 @@ static int cliSendCommand(int argc, char **argv, long repeat) { fflush(stdout); if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { - /* Handle pubsub mode */ - if (config.last_reply->elements >= 3) { - char *cmd = config.last_reply->element[0]->str; - int count = config.last_reply->element[2]->integer; - - if (strcmp(cmd, "subscribe") == 0) { - config.pubsub_unsharded_count++; - } else if (strcmp(cmd, "unsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "psubscribe") == 0) { - config.pubsub_unsharded_count++; - } else if (strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0) { - config.pubsub_sharded_count++; - } else if (strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } else { - config.pubsub_mode = 1; - } - } - + if (config.pubsub_mode) { + if (config.last_reply->elements >= 3) { + char *cmd = config.last_reply->element[0]->str; + int count = config.last_reply->element[2]->integer; + + if (strcmp(cmd, "subscribe") == 0) { + config.pubsub_unsharded_count++; + } else if (strcmp(cmd, "unsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "psubscribe") == 0) { + config.pubsub_unsharded_count++; + } else if (strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0) { + config.pubsub_sharded_count++; + } else if (strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + if (config.pubsub_mode) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } + } else { + config.pubsub_mode = 1; + cliRefreshPrompt(); + } + } + } if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the * [p|s][un]subscribe command. */ @@ -2433,7 +2461,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) { config.pubsub_mode = 1; cliRefreshPrompt(); } - if (--num_expected_pubsub_push > 0) { continue; /* We need more of these. */ } @@ -3147,6 +3174,13 @@ void cliSetPreferences(char **argv, int argc, int interactive) { else { printf("%sunknown valkey-cli preference '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]); } + } else if (!strcasecmp(argv[0], ":get") && argc >= 2) { + if (!strcasecmp(argv[1], "pubsub")) { + printf("%d\n", config.pubsub_mode); + } else { + printf("%sunknown valkey-cli get option '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]); + } + fflush(stdout); } else { printf("%sunknown valkey-cli internal command '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[0]); } diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index cd9fcece36..aeabcc452f 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -608,68 +608,64 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES] } - test {valkey-cli pubsub mode with multiple subscription types} { + test "valkey-cli pubsub mode with multiple subscription types" { set fd [open_cli] - - write_cli $fd "SUBSCRIBE channel1 channel2 channel3" - set response [read_cli $fd] - foreach line [split $response "\n"] { - if {[string match "*subscribe*channel1*" $line]} { - assert_match "*subscribe*channel1*" $line - } elseif {[string match "*subscribe*channel2*" $line]} { - assert_match "*subscribe*channel2*" $line - } elseif {[string match "*subscribe*channel3*" $line]} { - assert_match "*subscribe*channel3*" $line - } - } - - write_cli $fd "PSUBSCRIBE pattern*" - assert_match "*psubscribe*pattern**" [read_cli $fd] + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch2 ch3" + set response [read_cli $fd] - write_cli $fd "SSUBSCRIBE schannel1" - assert_match "*ssubscribe*schannel1*" [read_cli $fd] + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status - write_cli $fd "UNSUBSCRIBE channel1" - assert_match "*unsubscribe*channel1*" [read_cli $fd] + write_cli $fd "PSUBSCRIBE pattern*" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "psubscribe" [lindex $lines 0] + assert_equal "pattern*" [lindex $lines 1] + assert_equal "4" [lindex $lines 2] - # Verify still in pubsub mode - catch {run_command $fd "SET key value"} err - assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + write_cli $fd "SSUBSCRIBE schannel" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "ssubscribe" [lindex $lines 0] + assert_equal "schannel" [lindex $lines 1] + assert_equal "1" [lindex $lines 2] write_cli $fd "PUNSUBSCRIBE pattern*" - assert_match "*punsubscribe*pattern**" [read_cli $fd] + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "punsubscribe" [lindex $lines 0] + assert_equal "pattern*" [lindex $lines 1] + assert_equal "3" [lindex $lines 2] - # Verify still in pubsub mode - catch {run_command $fd "GET key"} err - assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status - write_cli $fd "SUNSUBSCRIBE schannel1" - assert_match "*sunsubscribe*schannel1*" [read_cli $fd] + write_cli $fd "SUNSUBSCRIBE schannel" + set response [read_cli $fd] + set lines [split $response "\n"] + assert_equal "sunsubscribe" [lindex $lines 0] + assert_equal "schannel" [lindex $lines 1] + assert_equal "0" [lindex $lines 2] - catch {run_command $fd "INCR counter"} err - assert_match "*ERR*only*SUBSCRIBE*UNSUBSCRIBE*allowed*" $err + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status - # Unsubscribe from all remaining channels write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] - set combined_response [read_cli $fd] - - foreach line [split $combined_response "\n"] { - if {[string match "*unsubscribe*channel2*" $line]} { - assert_match "*unsubscribe*channel2*" $line - } elseif {[string match "*unsubscribe*channel3*" $line]} { - assert_match "*unsubscribe*channel3*" $line - } - } + # Verify pubsub mode is no longer active + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status - # Verify that we've exited pubsub mode - set response [run_command $fd "PING"] - - if {[string first "(subscribed mode)" $response] >= 0} { - return -code error "Client is still in pubsub mode" - } - close_cli $fd } From b09ef3274a3edbb899f987a92059f8503b70eebe Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 6 Jan 2025 08:43:49 +0000 Subject: [PATCH 07/15] Refactored code, added punsubscribe functionality, and made test cases more robust Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 91 +++++++++++------------- tests/integration/valkey-cli.tcl | 114 +++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 52 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 92b753bb95..627e7d2f38 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2232,6 +2232,28 @@ static int cliReadReply(int output_raw_strings) { return REDIS_OK; } +/* Helper method to process unsubscribe responses */ +static void processUnsubscribeReply(redisReply *reply) { + if (reply->elements >= 3) { + char *cmd = reply->element[0]->str; + int count = reply->element[2]->integer; + + if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + + if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } + + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } + } + } +} + /* Simultaneously wait for pubsub messages from the server and input on stdin. */ static void cliWaitForMessagesOrStdin(void) { int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) || getenv("FAKETTY")); @@ -2250,28 +2272,7 @@ static void cliWaitForMessagesOrStdin(void) { fwrite(out, sdslen(out), 1, stdout); fflush(stdout); - // Handle unsubscribe responses - if (isPubsubPush(reply) && reply->elements >= 3) { - char *cmd = reply->element[0]->str; - int count = reply->element[2]->integer; - - if (strcmp(cmd, "unsubscribe") == 0 || - strcmp(cmd, "punsubscribe") == 0 || - strcmp(cmd, "sunsubscribe") == 0) { - - if (strcmp(cmd, "unsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - break; - } - } - } + processUnsubscribeReply(reply); sdsfree(out); freeReplyObject(reply); @@ -2426,36 +2427,22 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { /* Handle pubsub mode */ - if (config.pubsub_mode) { - if (config.last_reply->elements >= 3) { - char *cmd = config.last_reply->element[0]->str; - int count = config.last_reply->element[2]->integer; - - if (strcmp(cmd, "subscribe") == 0) { - config.pubsub_unsharded_count++; - } else if (strcmp(cmd, "unsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "psubscribe") == 0) { - config.pubsub_unsharded_count++; - } else if (strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0) { - config.pubsub_sharded_count++; - } else if (strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - if (config.pubsub_mode) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } - } else { - config.pubsub_mode = 1; - cliRefreshPrompt(); - } - } - } + if (config.last_reply->elements >= 3) { + char *cmd = config.last_reply->element[0]->str; + + /* If it's an unsubscribe command, call the helper */ + if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + processUnsubscribeReply(config.last_reply); + } + + /* Handle subscribe commands */ + else if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0) { + config.pubsub_unsharded_count++; + } else if (strcmp(cmd, "ssubscribe") == 0) { + config.pubsub_sharded_count++; + } + } + if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the * [p|s][un]subscribe command. */ diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index aeabcc452f..d8994a1d00 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -608,6 +608,120 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES] } + test "valkey-cli pubsub mode with standard channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch2 ch3" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with shard channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SSUBSCRIBE schannel1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "SUNSUBSCRIBE schannel1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SSUBSCRIBE schannel1 schannel2 schannel3" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "SUNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + + test "valkey-cli pubsub mode with pattern channel subscription" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "PSUBSCRIBE pattern1*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "PUNSUBSCRIBE pattern1*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "PSUBSCRIBE pattern1* pattern2* pattern3*" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "PUNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + test "valkey-cli pubsub mode with multiple subscription types" { set fd [open_cli] From 930b4dbb4f4937f39806d97636994210ffd8f844 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 6 Jan 2025 09:18:55 +0000 Subject: [PATCH 08/15] cleaned the code up to make it more readable Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 627e7d2f38..2b7f4dca87 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2237,19 +2237,16 @@ static void processUnsubscribeReply(redisReply *reply) { if (reply->elements >= 3) { char *cmd = reply->element[0]->str; int count = reply->element[2]->integer; + + if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } - if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - - if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } - - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + config.pubsub_mode = 0; + cliRefreshPrompt(); } } } @@ -2272,7 +2269,9 @@ static void cliWaitForMessagesOrStdin(void) { fwrite(out, sdslen(out), 1, stdout); fflush(stdout); - processUnsubscribeReply(reply); + if (isPubsubPush(reply)) { + processUnsubscribeReply(reply); + } sdsfree(out); freeReplyObject(reply); @@ -2432,7 +2431,9 @@ static int cliSendCommand(int argc, char **argv, long repeat) { /* If it's an unsubscribe command, call the helper */ if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - processUnsubscribeReply(config.last_reply); + if (config.pubsub_mode && isPubsubPush(config.last_reply)) { + processUnsubscribeReply(config.last_reply); + } } /* Handle subscribe commands */ From 1849195f78f7a1ae2449a549fc4fbd8fdbfbb690 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Mon, 6 Jan 2025 23:08:44 +0000 Subject: [PATCH 09/15] Fixed corner cases, added test case and split up existing test cases Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 6 ++-- tests/integration/valkey-cli.tcl | 57 ++++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 2b7f4dca87..8b939e00fa 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2428,6 +2428,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) { /* Handle pubsub mode */ if (config.last_reply->elements >= 3) { char *cmd = config.last_reply->element[0]->str; + int count = config.last_reply->element[2]->integer; + /* If it's an unsubscribe command, call the helper */ if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { @@ -2438,9 +2440,9 @@ static int cliSendCommand(int argc, char **argv, long repeat) { /* Handle subscribe commands */ else if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0) { - config.pubsub_unsharded_count++; + config.pubsub_unsharded_count = count; } else if (strcmp(cmd, "ssubscribe") == 0) { - config.pubsub_sharded_count++; + config.pubsub_sharded_count = count; } } diff --git a/tests/integration/valkey-cli.tcl b/tests/integration/valkey-cli.tcl index d8994a1d00..a56818b8c2 100644 --- a/tests/integration/valkey-cli.tcl +++ b/tests/integration/valkey-cli.tcl @@ -608,7 +608,7 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES] } - test "valkey-cli pubsub mode with standard channel subscription" { + test "valkey-cli pubsub mode with single standard channel subscription" { set fd [open_cli] write_cli $fd ":get pubsub" @@ -629,6 +629,16 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS set pubsub_status [string trim [read_cli $fd]] assert_equal "0" $pubsub_status + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple standard channel subscriptions" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + write_cli $fd "SUBSCRIBE ch1 ch2 ch3" set response [read_cli $fd] @@ -646,7 +656,7 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS close_cli $fd } - test "valkey-cli pubsub mode with shard channel subscription" { + test "valkey-cli pubsub mode with single shard channel subscription" { set fd [open_cli] write_cli $fd ":get pubsub" @@ -667,6 +677,17 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS set pubsub_status [string trim [read_cli $fd]] assert_equal "0" $pubsub_status + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple shard channel subscriptions" { + + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + write_cli $fd "SSUBSCRIBE schannel1 schannel2 schannel3" set response [read_cli $fd] @@ -684,7 +705,7 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS close_cli $fd } - test "valkey-cli pubsub mode with pattern channel subscription" { + test "valkey-cli pubsub mode with single pattern channel subscription" { set fd [open_cli] write_cli $fd ":get pubsub" @@ -705,6 +726,12 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS set pubsub_status [string trim [read_cli $fd]] assert_equal "0" $pubsub_status + close_cli $fd + } + + test "valkey-cli pubsub mode with multiple pattern channel subscriptions" { + set fd [open_cli] + write_cli $fd "PSUBSCRIBE pattern1* pattern2* pattern3*" set response [read_cli $fd] @@ -722,6 +749,30 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS close_cli $fd } + test "valkey-cli pubsub mode when subscribing to the same channel" { + set fd [open_cli] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + write_cli $fd "SUBSCRIBE ch1 ch1" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "1" $pubsub_status + + write_cli $fd "UNSUBSCRIBE" + set response [read_cli $fd] + + write_cli $fd ":get pubsub" + set pubsub_status [string trim [read_cli $fd]] + assert_equal "0" $pubsub_status + + close_cli $fd + } + test "valkey-cli pubsub mode with multiple subscription types" { set fd [open_cli] From 9f99a514909dfe112496bddddb0e4a7290e3f059 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 7 Jan 2025 19:24:31 +0000 Subject: [PATCH 10/15] Refactored code Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 8b939e00fa..9b505f263f 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2233,20 +2233,26 @@ static int cliReadReply(int output_raw_strings) { } /* Helper method to process unsubscribe responses */ -static void processUnsubscribeReply(redisReply *reply) { +static void handlePubSubMode(redisReply *reply) { if (reply->elements >= 3) { char *cmd = reply->element[0]->str; int count = reply->element[2]->integer; - if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + /* Update counts based on the command type */ + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || + strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "sunsubscribe") == 0) { + } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { config.pubsub_sharded_count = count; } - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0) { + /* Update pubsub mode based on the current counts */ + if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0 && config.pubsub_mode) { config.pubsub_mode = 0; cliRefreshPrompt(); + } else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) { + config.pubsub_mode = 1; + cliRefreshPrompt(); } } } @@ -2270,7 +2276,7 @@ static void cliWaitForMessagesOrStdin(void) { fflush(stdout); if (isPubsubPush(reply)) { - processUnsubscribeReply(reply); + handlePubSubMode(reply); } sdsfree(out); @@ -2428,22 +2434,14 @@ static int cliSendCommand(int argc, char **argv, long repeat) { /* Handle pubsub mode */ if (config.last_reply->elements >= 3) { char *cmd = config.last_reply->element[0]->str; - int count = config.last_reply->element[2]->integer; - - - /* If it's an unsubscribe command, call the helper */ - if (strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - if (config.pubsub_mode && isPubsubPush(config.last_reply)) { - processUnsubscribeReply(config.last_reply); - } - } - - /* Handle subscribe commands */ - else if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0) { - config.pubsub_sharded_count = count; + + /* Call the helper to handle both subscribe and unsubscribe commands */ + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || + strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || + strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + handlePubSubMode(config.last_reply); } + } if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { From 022b2ccd6d444506c2a08904655e2d02fdcf5763 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Tue, 7 Jan 2025 23:56:21 +0000 Subject: [PATCH 11/15] Removed redundant code Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 9b505f263f..9e6e68241f 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2432,17 +2432,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { /* Handle pubsub mode */ - if (config.last_reply->elements >= 3) { - char *cmd = config.last_reply->element[0]->str; - - /* Call the helper to handle both subscribe and unsubscribe commands */ - if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || - strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || - strcmp(cmd, "punsubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - handlePubSubMode(config.last_reply); - } - - } + handlePubSubMode(config.last_reply); if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the From fe091cf0d292c463a1cb97e3f7a1bb538e6e629a Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 8 Jan 2025 00:22:07 +0000 Subject: [PATCH 12/15] Removed redundant code Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 9e6e68241f..c4fdbc3d4b 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2437,10 +2437,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the * [p|s][un]subscribe command. */ - if (is_subscribe && !config.pubsub_mode) { - config.pubsub_mode = 1; - cliRefreshPrompt(); - } if (--num_expected_pubsub_push > 0) { continue; /* We need more of these. */ } From 9aff4d8448b2c092b59c748e12935d6cbf681a23 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 8 Jan 2025 18:07:48 +0000 Subject: [PATCH 13/15] removed if statement and updated comments Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index c4fdbc3d4b..01744ad37d 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2232,29 +2232,28 @@ static int cliReadReply(int output_raw_strings) { return REDIS_OK; } -/* Helper method to process unsubscribe responses */ +/* Helper method to handle pubsub subscription/unsubscription. */ static void handlePubSubMode(redisReply *reply) { - if (reply->elements >= 3) { - char *cmd = reply->element[0]->str; - int count = reply->element[2]->integer; + + char *cmd = reply->element[0]->str; + int count = reply->element[2]->integer; - /* Update counts based on the command type */ - if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || - strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { - config.pubsub_unsharded_count = count; - } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { - config.pubsub_sharded_count = count; - } + /* Update counts based on the command type */ + if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { + config.pubsub_unsharded_count = count; + } else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) { + config.pubsub_sharded_count = count; + } - /* Update pubsub mode based on the current counts */ - if (config.pubsub_unsharded_count == 0 && config.pubsub_sharded_count == 0 && config.pubsub_mode) { - config.pubsub_mode = 0; - cliRefreshPrompt(); - } else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) { - config.pubsub_mode = 1; - cliRefreshPrompt(); - } + /* Update pubsub mode based on the current counts */ + if (config.pubsub_unsharded_count + config.pubsub_sharded_count == 0 && config.pubsub_mode) { + config.pubsub_mode = 0; + cliRefreshPrompt(); + } else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) { + config.pubsub_mode = 1; + cliRefreshPrompt(); } + } /* Simultaneously wait for pubsub messages from the server and input on stdin. */ @@ -2431,7 +2430,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) { fflush(stdout); if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { - /* Handle pubsub mode */ handlePubSubMode(config.last_reply); if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { From 6aebdb9908c0cf8e670e938a182f378e44d929d3 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 8 Jan 2025 19:13:21 +0000 Subject: [PATCH 14/15] Fixed formatting errors Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index 01744ad37d..d6ddeda419 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2228,13 +2228,11 @@ static int cliReadReply(int output_raw_strings) { fflush(stdout); sdsfree(out); } - return REDIS_OK; } /* Helper method to handle pubsub subscription/unsubscription. */ static void handlePubSubMode(redisReply *reply) { - char *cmd = reply->element[0]->str; int count = reply->element[2]->integer; @@ -2253,7 +2251,6 @@ static void handlePubSubMode(redisReply *reply) { config.pubsub_mode = 1; cliRefreshPrompt(); } - } /* Simultaneously wait for pubsub messages from the server and input on stdin. */ @@ -2430,7 +2427,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) { fflush(stdout); if (config.pubsub_mode || num_expected_pubsub_push > 0) { if (isPubsubPush(config.last_reply)) { - handlePubSubMode(config.last_reply); + handlePubSubMode(config.last_reply); if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) { /* This pushed message confirms the From fcc27426d01d477905e891573a19f2f8bc53b626 Mon Sep 17 00:00:00 2001 From: Nikhil Manglore Date: Wed, 8 Jan 2025 19:21:20 +0000 Subject: [PATCH 15/15] Fixed final formatting error Signed-off-by: Nikhil Manglore --- src/valkey-cli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index d6ddeda419..d2fa537036 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -2235,7 +2235,7 @@ static int cliReadReply(int output_raw_strings) { static void handlePubSubMode(redisReply *reply) { char *cmd = reply->element[0]->str; int count = reply->element[2]->integer; - + /* Update counts based on the command type */ if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) { config.pubsub_unsharded_count = count;