Skip to content

Commit f1c7f62

Browse files
committed
fix aggregation test
1 parent 04a110a commit f1c7f62

File tree

2 files changed

+57
-130
lines changed

2 files changed

+57
-130
lines changed

osscluster_router.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,11 @@ func (c *ClusterClient) aggregateResponses(cmd Cmder, cmds []Cmder, policy *rout
418418
// createAggregator creates the appropriate response aggregator
419419
func (c *ClusterClient) createAggregator(policy *routing.CommandPolicy, cmd Cmder, isKeyed bool) routing.ResponseAggregator {
420420
if policy != nil {
421+
// For multi-shard commands that operate on multiple keys (like MGET),
422+
// use keyed aggregator even if policy says all_succeeded
423+
if policy.Request == routing.ReqMultiShard && isKeyed {
424+
return routing.NewDefaultAggregator(true)
425+
}
421426
return routing.NewResponseAggregator(policy.Response, cmd.Name())
422427
}
423428

osscluster_test.go

Lines changed: 52 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,145 +2038,65 @@ var _ = Describe("Command Tips tests", func() {
20382038
Expect(dbSizeResult.Val()).To(Equal(int64(0)))
20392039
}
20402040

2041-
// PFCOUNT command aggregation policy - verify agg_min policy
2042-
pfcountCmd, exists := cmds["pfcount"]
2043-
if !exists || pfcountCmd.Tips == nil {
2044-
Skip("PFCOUNT command or tips not available")
2041+
// WAIT command aggregation policy - verify agg_min policy
2042+
waitCmd, exists := cmds["wait"]
2043+
if !exists || waitCmd.Tips == nil {
2044+
Skip("WAIT command or tips not available")
20452045
}
20462046

2047-
actualPfcountPolicy := pfcountCmd.Tips.Response.String()
2047+
Expect(waitCmd.Tips.Response.String()).To(Equal("agg_min"))
20482048

2049-
if actualPfcountPolicy != "agg_min" {
2050-
Skip("PFCOUNT does not have agg_min policy in this Redis version")
2051-
}
2052-
2053-
// Create HyperLogLog keys on different shards with different cardinalities
2054-
hllKeys := []string{
2055-
"hll_test_key_1111",
2056-
"hll_test_key_2222",
2057-
"hll_test_key_3333",
2058-
}
2059-
2060-
hllData := map[string][]string{
2061-
"hll_test_key_1111": {"elem1", "elem2", "elem3", "elem4", "elem5"},
2062-
"hll_test_key_2222": {"elem6", "elem7", "elem8"},
2063-
"hll_test_key_3333": {"elem9", "elem10", "elem11", "elem12", "elem13", "elem14", "elem15"},
2064-
}
2065-
2066-
hllKeysPerShard := make(map[string][]string)
2067-
expectedCounts := make(map[string]int64)
2068-
2069-
for key, elements := range hllData {
2070-
2071-
interfaceElements := make([]interface{}, len(elements))
2072-
for i, elem := range elements {
2073-
interfaceElements[i] = elem
2074-
}
2075-
result := client.PFAdd(ctx, key, interfaceElements...)
2076-
Expect(result.Err()).NotTo(HaveOccurred())
2077-
2078-
countResult := client.PFCount(ctx, key)
2079-
Expect(countResult.Err()).NotTo(HaveOccurred())
2080-
expectedCounts[key] = countResult.Val()
2081-
2082-
for _, node := range masterNodes {
2083-
// Check if key exists on this shard by trying to get its count
2084-
shardCountResult := node.client.PFCount(ctx, key)
2085-
if shardCountResult.Err() == nil && shardCountResult.Val() > 0 {
2086-
hllKeysPerShard[node.addr] = append(hllKeysPerShard[node.addr], key)
2087-
break
2088-
}
2089-
}
2090-
}
2091-
2092-
// Verify keys are distributed across multiple shards
2093-
shardsWithHLLKeys := len(hllKeysPerShard)
2094-
Expect(shardsWithHLLKeys).To(BeNumerically(">", 1))
2095-
2096-
// Execute PFCOUNT command on all keys - should aggregate using agg_min
2097-
pfcountResult := client.PFCount(ctx, hllKeys...)
2098-
Expect(pfcountResult.Err()).NotTo(HaveOccurred())
2099-
2100-
aggregatedCount := pfcountResult.Val()
2101-
2102-
// Verify the aggregation by manually getting counts from each shard
2103-
var shardCounts []int64
2104-
for shardAddr, keys := range hllKeysPerShard {
2105-
if len(keys) == 0 {
2106-
continue
2107-
}
2108-
2109-
// Find the node for this shard
2110-
var shardNode *masterNode
2111-
for i := range masterNodes {
2112-
if masterNodes[i].addr == shardAddr {
2113-
shardNode = &masterNodes[i]
2114-
break
2115-
}
2116-
}
2117-
Expect(shardNode).NotTo(BeNil())
2118-
2119-
// Get count for keys on this specific shard
2120-
shardResult := shardNode.client.PFCount(ctx, keys...)
2121-
Expect(shardResult.Err()).NotTo(HaveOccurred())
2122-
2123-
shardCount := shardResult.Val()
2124-
shardCounts = append(shardCounts, shardCount)
2125-
}
2049+
// Set up some data to replicate
2050+
testKey := "wait_test_key_1111"
2051+
result := client.Set(ctx, testKey, "test_value", 0)
2052+
Expect(result.Err()).NotTo(HaveOccurred())
21262053

2127-
// Find the minimum count from all shards
2128-
expectedMin := shardCounts[0]
2129-
for _, count := range shardCounts[1:] {
2130-
if count < expectedMin {
2131-
expectedMin = count
2132-
}
2133-
}
2054+
// Execute WAIT command - should aggregate using agg_min across all shards
2055+
// WAIT waits for a given number of replicas to acknowledge writes
2056+
// With agg_min policy, it returns the minimum number of replicas that acknowledged
2057+
waitResult := client.Wait(ctx, 0, 1000) // Wait for 0 replicas with 1 second timeout
2058+
Expect(waitResult.Err()).NotTo(HaveOccurred())
21342059

2135-
// Verify agg_min aggregation worked correctly
2136-
Expect(aggregatedCount).To(Equal(expectedMin))
2060+
// The result should be the minimum number of replicas across all shards
2061+
// Since we're asking for 0 replicas, all shards should return 0, so min is 0
2062+
minReplicas := waitResult.Val()
2063+
Expect(minReplicas).To(BeNumerically(">=", 0))
21372064

2138-
// EXISTS command aggregation policy - verify agg_logical_and policy
2139-
existsCmd, exists := cmds["exists"]
2140-
if !exists || existsCmd.Tips == nil {
2141-
Skip("EXISTS command or tips not available")
2065+
// SCRIPT EXISTS command aggregation policy - verify agg_logical_and policy
2066+
scriptExistsCmd, exists := cmds["script exists"]
2067+
if !exists || scriptExistsCmd.Tips == nil {
2068+
Skip("SCRIPT EXISTS command or tips not available")
21422069
}
21432070

2144-
actualExistsPolicy := existsCmd.Tips.Response.String()
2145-
if actualExistsPolicy != "agg_logical_and" {
2146-
Skip("EXISTS does not have agg_logical_and policy in this Redis version")
2147-
}
2071+
Expect(scriptExistsCmd.Tips.Response.String()).To(Equal("agg_logical_and"))
21482072

2149-
existsTestKeys := []string{
2150-
"exists_test_key_1111",
2151-
"exists_test_key_2222",
2152-
"exists_test_key_3333",
2153-
}
2073+
// Load a script on all shards
2074+
testScript := "return 'hello'"
2075+
scriptLoadResult := client.ScriptLoad(ctx, testScript)
2076+
Expect(scriptLoadResult.Err()).NotTo(HaveOccurred())
2077+
scriptSHA := scriptLoadResult.Val()
21542078

2155-
for _, key := range existsTestKeys {
2156-
result := client.Set(ctx, key, "exists_test_value", 0)
2157-
Expect(result.Err()).NotTo(HaveOccurred())
2158-
}
2159-
2160-
//All keys exist - should return true
2161-
existsResult := client.Exists(ctx, existsTestKeys...)
2162-
Expect(existsResult.Err()).NotTo(HaveOccurred())
2079+
// Verify script exists on all shards using SCRIPT EXISTS
2080+
// With agg_logical_and policy, it should return true only if script exists on ALL shards
2081+
scriptExistsResult := client.ScriptExists(ctx, scriptSHA)
2082+
Expect(scriptExistsResult.Err()).NotTo(HaveOccurred())
21632083

2164-
allExistCount := existsResult.Val()
2165-
Expect(allExistCount).To(Equal(int64(len(existsTestKeys))))
2166-
2167-
// Delete one key and test again - logical AND should handle mixed results
2168-
deletedKey := existsTestKeys[0]
2169-
delResult := client.Del(ctx, deletedKey)
2170-
Expect(delResult.Err()).NotTo(HaveOccurred())
2084+
existsResults := scriptExistsResult.Val()
2085+
Expect(len(existsResults)).To(Equal(1))
2086+
Expect(existsResults[0]).To(BeTrue()) // Script should exist on all shards
21712087

2172-
// Check EXISTS again - now one key is missing
2173-
existsResult2 := client.Exists(ctx, existsTestKeys...)
2174-
Expect(existsResult2.Err()).NotTo(HaveOccurred())
2088+
// Test with a non-existent script SHA
2089+
nonExistentSHA := "0000000000000000000000000000000000000000"
2090+
scriptExistsResult2 := client.ScriptExists(ctx, nonExistentSHA)
2091+
Expect(scriptExistsResult2.Err()).NotTo(HaveOccurred())
21752092

2176-
partialExistCount := existsResult2.Val()
2093+
existsResults2 := scriptExistsResult2.Val()
2094+
Expect(len(existsResults2)).To(Equal(1))
2095+
Expect(existsResults2[0]).To(BeFalse()) // Script should not exist on any shard
21772096

2178-
// Should return count of existing keys (2 out of 3)
2179-
Expect(partialExistCount).To(Equal(int64(len(existsTestKeys) - 1)))
2097+
// Test with mixed scenario - flush scripts from one shard manually
2098+
// This is harder to test in practice since SCRIPT FLUSH affects all shards
2099+
// So we'll just verify the basic functionality works
21802100
})
21812101

21822102
It("should verify command aggregation policies", func() {
@@ -2186,10 +2106,12 @@ var _ = Describe("Command Tips tests", func() {
21862106
Expect(err).NotTo(HaveOccurred())
21872107

21882108
commandPolicies := map[string]string{
2189-
"touch": "agg_sum",
2190-
"flushall": "all_succeeded",
2191-
"pfcount": "agg_min",
2192-
"exists": "agg_logical_and",
2109+
"touch": "agg_sum",
2110+
"flushall": "all_succeeded",
2111+
"pfcount": "all_succeeded",
2112+
"exists": "agg_sum",
2113+
"script exists": "agg_logical_and",
2114+
"wait": "agg_min",
21932115
}
21942116

21952117
for cmdName, expectedPolicy := range commandPolicies {
@@ -2306,7 +2228,7 @@ var _ = Describe("Command Tips tests", func() {
23062228
Expect(err).NotTo(HaveOccurred())
23072229
Expect(len(masterNodes)).To(BeNumerically(">", 1))
23082230

2309-
// MGET command aggregation across multiple keys on different shards - verify agg_sum policy
2231+
// MGET command aggregation across multiple keys on different shards - verify all_succeeded policy with keyed aggregation
23102232
testData := map[string]string{
23112233
"mget_test_key_1111": "value1",
23122234
"mget_test_key_2222": "value2",

0 commit comments

Comments
 (0)