Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parsing for stream entries #557

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redis/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@
//
// Reply Helpers
//
// The Bool, Int, Bytes, String, Strings and Values functions convert a reply
// to a value of a specific type. To allow convenient wrapping of calls to the
// The Bool, Int, Bytes, String, Strings, Values, Map and Entries functions convert
// a reply to a value of a specific type. To allow convenient wrapping of calls to the
// connection Do and Receive methods, the functions take a second argument of
// type error. If the error is non-nil, then the helper function returns the
// error. If the error is nil, the function converts the reply to the specified
Expand Down
6 changes: 6 additions & 0 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ type SlowLog struct {
// ClientName is the name set via the CLIENT SETNAME command (4.0 only).
ClientName string
}

// Entry represents a single stream entry.
type Entry struct {
ID string
Fields map[string]string
}
30 changes: 30 additions & 0 deletions redis/reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,33 @@ func SlowLogs(result interface{}, err error) ([]SlowLog, error) {
}
return logs, nil
}

// Entries is a helper that converts an array of stream entries into Entry values.
// Requires two values in each entry result, and an even number of field values.
func Entries(reply interface{}, err error) ([]Entry, error) {
vs, err := Values(reply, err)
if err != nil {
return nil, err
}

entries := make([]Entry, len(vs))
for i, v := range vs {
evs, ok := v.([]interface{})
if !ok || len(evs) != 2 {
return nil, errors.New("redigo: Entry expects two value result")
}
id, err := String(evs[0], nil)
if err != nil {
return nil, err
}
sm, err := StringMap(evs[1], nil)
if err != nil {
return nil, err
}
entries[i] = Entry{
ID: id,
Fields: sm,
}
}
return entries, nil
}
74 changes: 74 additions & 0 deletions redis/reply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,27 @@ var replyTests = []struct {
ve(getSlowLog()),
ve(redis.SlowLog{ID: 1, Time: time.Unix(1579625870, 0), ExecutionTime: time.Duration(3) * time.Microsecond, Args: []string{"set", "x", "y"}, ClientAddr: "localhost:1234", ClientName: "testClient"}, nil),
},
{
"[Entry(0-1, name, Anna), Entry(1-1, name, Bruce)]",
ve(redis.Entries([]interface{}{
[]interface{}{[]byte("0-1"), []interface{}{[]byte("name"), []byte("Anna")}},
[]interface{}{[]byte("1-1"), []interface{}{[]byte("name"), []byte("Bruce")}},
}, nil)),
ve([]redis.Entry{
{
ID: "0-1",
Fields: map[string]string{
"name": "Anna",
},
},
{
ID: "1-1",
Fields: map[string]string{
"name": "Bruce",
},
},
}, nil),
},
}

func getSlowLog() (redis.SlowLog, error) {
Expand Down Expand Up @@ -220,6 +241,59 @@ func TestSlowLog(t *testing.T) {
}
}

func TestEntries(t *testing.T) {
c, err := dial()
if err != nil {
t.Errorf("Failed during dial with error " + err.Error())
return
}
defer c.Close()

resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "stream-node-max-entries"))
if err != nil {
t.Errorf("Failed during CONFIG GET stream-node-max-entries with error " + err.Error())
return
}
// in case of older verion < 5.0 where streams are not supported don't run the test
if len(resultStr) == 0 {
t.Skip("Skipped, stream feature not supported")
}

n := 3
for i := 0; i < n; i++ {
_, err = redis.String(c.Do("XADD", "teststream", fmt.Sprintf("0-%d", i+1), "index", fmt.Sprintf("%d", i)))
if err != nil {
t.Errorf("Failed during XADD with error " + err.Error())
return
}
}

entries, err := redis.Entries(c.Do("XRANGE", "teststream", "-", "+"))
if err != nil {
t.Errorf("Failed during XRANGE with error " + err.Error())
return
}
if len(entries) != n {
t.Errorf("Expected %d entries in result, got %d", n, len(entries))
return
}
for i, entry := range entries {
expectedID := fmt.Sprintf("0-%d", i+1)
expectedFields := map[string]string{
"index": fmt.Sprintf("%d", i),
}

if entry.ID != expectedID {
t.Errorf("Expected entry ID to equal %s, got %s", expectedID, entry.ID)
return
}
if !reflect.DeepEqual(expectedFields, entry.Fields) {
t.Errorf("Expected entry to have fields %v, got %v", expectedFields, entry.Fields)
return
}
}
}

// dial wraps DialDefaultServer() with a more suitable function name for examples.
func dial() (redis.Conn, error) {
return redis.DialDefaultServer()
Expand Down