diff --git a/redis/doc.go b/redis/doc.go index 69ad506c..3206094d 100644 --- a/redis/doc.go +++ b/redis/doc.go @@ -142,8 +142,8 @@ // // Reply Helpers // -// The Bool, Int, Bytes, String, Strings and Values functions convert a reply -// to a value of a specific type. To allow convenient wrapping of calls to the +// The Bool, Int, Bytes, String, Strings, Values, Map and Entries functions convert +// a reply to a value of a specific type. To allow convenient wrapping of calls to the // connection Do and Receive methods, the functions take a second argument of // type error. If the error is non-nil, then the helper function returns the // error. If the error is nil, the function converts the reply to the specified diff --git a/redis/redis.go b/redis/redis.go index e4464874..57f2dc25 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -136,3 +136,9 @@ type SlowLog struct { // ClientName is the name set via the CLIENT SETNAME command (4.0 only). ClientName string } + +// Entry represents a single stream entry. +type Entry struct { + ID string + Fields map[string]string +} diff --git a/redis/reply.go b/redis/reply.go index dfe6aff7..4118eb19 100644 --- a/redis/reply.go +++ b/redis/reply.go @@ -581,3 +581,33 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) { } return logs, nil } + +// Entries is a helper that converts an array of stream entries into Entry values. +// Requires two values in each entry result, and an even number of field values. +func Entries(reply interface{}, err error) ([]Entry, error) { + vs, err := Values(reply, err) + if err != nil { + return nil, err + } + + entries := make([]Entry, len(vs)) + for i, v := range vs { + evs, ok := v.([]interface{}) + if !ok || len(evs) != 2 { + return nil, errors.New("redigo: Entry expects two value result") + } + id, err := String(evs[0], nil) + if err != nil { + return nil, err + } + sm, err := StringMap(evs[1], nil) + if err != nil { + return nil, err + } + entries[i] = Entry{ + ID: id, + Fields: sm, + } + } + return entries, nil +} diff --git a/redis/reply_test.go b/redis/reply_test.go index 8993bad1..17c44393 100644 --- a/redis/reply_test.go +++ b/redis/reply_test.go @@ -143,6 +143,27 @@ var replyTests = []struct { ve(getSlowLog()), ve(redis.SlowLog{ID: 1, Time: time.Unix(1579625870, 0), ExecutionTime: time.Duration(3) * time.Microsecond, Args: []string{"set", "x", "y"}, ClientAddr: "localhost:1234", ClientName: "testClient"}, nil), }, + { + "[Entry(0-1, name, Anna), Entry(1-1, name, Bruce)]", + ve(redis.Entries([]interface{}{ + []interface{}{[]byte("0-1"), []interface{}{[]byte("name"), []byte("Anna")}}, + []interface{}{[]byte("1-1"), []interface{}{[]byte("name"), []byte("Bruce")}}, + }, nil)), + ve([]redis.Entry{ + { + ID: "0-1", + Fields: map[string]string{ + "name": "Anna", + }, + }, + { + ID: "1-1", + Fields: map[string]string{ + "name": "Bruce", + }, + }, + }, nil), + }, } func getSlowLog() (redis.SlowLog, error) { @@ -220,6 +241,59 @@ func TestSlowLog(t *testing.T) { } } +func TestEntries(t *testing.T) { + c, err := dial() + if err != nil { + t.Errorf("Failed during dial with error " + err.Error()) + return + } + defer c.Close() + + resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "stream-node-max-entries")) + if err != nil { + t.Errorf("Failed during CONFIG GET stream-node-max-entries with error " + err.Error()) + return + } + // in case of older verion < 5.0 where streams are not supported don't run the test + if len(resultStr) == 0 { + t.Skip("Skipped, stream feature not supported") + } + + n := 3 + for i := 0; i < n; i++ { + _, err = redis.String(c.Do("XADD", "teststream", fmt.Sprintf("0-%d", i+1), "index", fmt.Sprintf("%d", i))) + if err != nil { + t.Errorf("Failed during XADD with error " + err.Error()) + return + } + } + + entries, err := redis.Entries(c.Do("XRANGE", "teststream", "-", "+")) + if err != nil { + t.Errorf("Failed during XRANGE with error " + err.Error()) + return + } + if len(entries) != n { + t.Errorf("Expected %d entries in result, got %d", n, len(entries)) + return + } + for i, entry := range entries { + expectedID := fmt.Sprintf("0-%d", i+1) + expectedFields := map[string]string{ + "index": fmt.Sprintf("%d", i), + } + + if entry.ID != expectedID { + t.Errorf("Expected entry ID to equal %s, got %s", expectedID, entry.ID) + return + } + if !reflect.DeepEqual(expectedFields, entry.Fields) { + t.Errorf("Expected entry to have fields %v, got %v", expectedFields, entry.Fields) + return + } + } +} + // dial wraps DialDefaultServer() with a more suitable function name for examples. func dial() (redis.Conn, error) { return redis.DialDefaultServer()