From 9bf83e231eb1756f4fffc7182d99ef4840b9bb5d Mon Sep 17 00:00:00 2001 From: Jean-Philippe Moal Date: Sat, 26 Oct 2019 18:33:17 +0200 Subject: [PATCH 1/4] Implement XADD and XLEN --- README.md | 3 + cmd_stream.go | 114 ++++++++++++++++++++++++++ cmd_stream_test.go | 158 +++++++++++++++++++++++++++++++++++++ db.go | 76 ++++++++++++++++++ direct.go | 51 +++++++++++- go.mod | 2 + integration/stream_test.go | 36 +++++++++ miniredis.go | 10 +++ miniredis_test.go | 27 +++++++ stream.go | 106 +++++++++++++++++++++++++ 10 files changed, 582 insertions(+), 1 deletion(-) create mode 100644 cmd_stream.go create mode 100644 cmd_stream_test.go create mode 100644 integration/stream_test.go create mode 100644 stream.go diff --git a/README.md b/README.md index 748dc6af..8dd1dca5 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,9 @@ Implemented commands: - ZSCORE - ZUNIONSTORE - ZSCAN + - Stream keys + - XADD + - XLEN - Scripting - EVAL - EVALSHA diff --git a/cmd_stream.go b/cmd_stream.go new file mode 100644 index 00000000..cc63a944 --- /dev/null +++ b/cmd_stream.go @@ -0,0 +1,114 @@ +// Commands from https://redis.io/commands#stream + +package miniredis + +import ( + "strings" + + "github.com/alicebob/miniredis/v2/server" +) + +// commandsStream handles all stream operations. +func commandsStream(m *Miniredis) { + m.srv.Register("XADD", m.cmdXadd) + // XRANGE key start end [COUNT count] + // XREVRANGE key end start [COUNT count] + m.srv.Register("XLEN", m.cmdXlen) +} + +// XADD +func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) { + if len(args) < 4 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + if !m.handleAuth(c) { + return + } + if m.checkPubsub(c) { + return + } + + key, id, args := args[0], args[1], args[2:] + var entryID streamEntryID + + if strings.ToLower(id) == "maxlen" { + setDirty(c) + c.WriteError("ERR option MAXLEN is not supported") + return + } + if id != "*" { + var err error + entryID, err = formatStreamEntryID(id) + if err != nil { + setDirty(c) + c.WriteError(err.Error()) + return + } + } + + // args must be composed of field/value pairs. + if len(args) == 0 || len(args)%2 != 0 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + + entryDict := make(map[string]string) + for len(args) > 0 { + entryDict[args[0]] = args[1] + args = args[2:] + } + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + if db.exists(key) && db.t(key) != "stream" { + c.WriteError(ErrWrongType.Error()) + return + } + + newID, err := db.streamAdd(key, entryID, entryDict) + if err != nil { + c.WriteError(err.Error()) + return + } + + c.WriteBulk(newID) + }) +} + +// XLEN +func (m *Miniredis) cmdXlen(c *server.Peer, cmd string, args []string) { + if len(args) != 1 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + if !m.handleAuth(c) { + return + } + if m.checkPubsub(c) { + return + } + + key := args[0] + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + t, ok := db.keys[key] + if !ok { + // No such key. That's zero length. + c.WriteInt(0) + return + } + if t != "stream" { + c.WriteError(msgWrongType) + return + } + + c.WriteInt(len(db.streamKeys[key])) + }) +} diff --git a/cmd_stream_test.go b/cmd_stream_test.go new file mode 100644 index 00000000..be879628 --- /dev/null +++ b/cmd_stream_test.go @@ -0,0 +1,158 @@ +package miniredis + +import ( + "fmt" + "math" + "regexp" + "testing" + + "github.com/gomodule/redigo/redis" +) + +// Test XADD / XLEN +func TestStream(t *testing.T) { + s, err := Run() + ok(t, err) + defer s.Close() + c, err := redis.Dial("tcp", s.Addr()) + ok(t, err) + defer c.Close() + + res, err := redis.String(c.Do("XADD", "s", "1234567-89", "one", "1", "two", "2")) + ok(t, err) + equals(t, "1234567-89", res) + + count, err := redis.Int(c.Do("XLEN", "s")) + ok(t, err) + equals(t, 1, count) + + t.Run("TYPE", func(t *testing.T) { + s, err := redis.String(c.Do("TYPE", "s")) + ok(t, err) + equals(t, "stream", s) + }) + + t.Run("direct usage", func(t *testing.T) { + _, err := s.XAdd("s1", "0-0", map[string]string{"name": "foo"}) + assert(t, err != nil, "XAdd error") + + id, err := s.XAdd("s1", "12345-67", map[string]string{"name": "bar"}) + ok(t, err) + equals(t, "12345-67", id) + + id, err = s.XAdd("s1", "12345-0", map[string]string{"name": "foo"}) + ok(t, err) + + id, err = s.XAdd("s1", "*", map[string]string{"name": "baz"}) + ok(t, err) + exp := `\d+-0` + matched, err := regexp.MatchString(exp, id) + ok(t, err) + assert(t, matched, "expected: %#v got: %#v", exp, id) + + stream, err := s.Stream("s1") + ok(t, err) + equals(t, 3, len(stream)) + equals(t, map[string]string{"name": "bar"}, stream[1]["12345-67"]) + }) +} + +// Test XADD +func TestStreamAdd(t *testing.T) { + s, err := Run() + ok(t, err) + defer s.Close() + c, err := redis.Dial("tcp", s.Addr()) + ok(t, err) + defer c.Close() + + t.Run("XADD", func(t *testing.T) { + res, err := redis.String(c.Do("XADD", "s", "123456", "one", "11", "two", "22")) + ok(t, err) + equals(t, "123456-0", res) + + res, err = redis.String(c.Do("XADD", "s", "*", "one", "1", "two", "2")) + ok(t, err) + exp := `\d+-0` + matched, err := regexp.MatchString(exp, res) + ok(t, err) + assert(t, matched, "expected: %#v got: %#v", exp, res) + + k := fmt.Sprintf("%d-0", uint64(math.MaxUint64-100)) + res, err = redis.String(c.Do("XADD", "s", k, "one", "11", "two", "22")) + ok(t, err) + equals(t, k, res) + + res, err = redis.String(c.Do("XADD", "s", "*", "one", "111", "two", "222")) + ok(t, err) + equals(t, fmt.Sprintf("%d-1", uint64(math.MaxUint64-100)), res) + }) + + t.Run("error cases", func(t *testing.T) { + // Wrong type of key + _, err := redis.String(c.Do("SET", "str", "value")) + ok(t, err) + + _, err = s.XAdd("str", "*", map[string]string{"hi": "1"}) + mustFail(t, err, msgWrongType) + + _, err = redis.String(c.Do("XADD", "str", "*", "hi", "1")) + mustFail(t, err, msgWrongType) + _, err = redis.String(c.Do("XADD")) + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s")) + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", "*")) + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", "*", "key")) // odd + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", "MAXLEN", "~", "1000", "*", "key")) // MAXLEN + assert(t, err != nil, "XADD error") + + _, err = redis.String(c.Do("XADD", "s", "a-b", "one", "111", "two", "222")) // invalid id format + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", "0-0", "one", "111", "two", "222")) // invalid id format + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", "1234567-89", "one", "111", "two", "222")) // invalid id value + assert(t, err != nil, "XADD error") + _, err = redis.String(c.Do("XADD", "s", fmt.Sprintf("%d-0", uint64(math.MaxUint64-100)), "one", "111", "two", "222")) // invalid id value + assert(t, err != nil, "XADD error") + }) +} + +// Test XLEN +func TestStreamLen(t *testing.T) { + s, err := Run() + ok(t, err) + defer s.Close() + c, err := redis.Dial("tcp", s.Addr()) + ok(t, err) + defer c.Close() + + _, err = redis.String(c.Do("XADD", "s", "*", "one", "1", "two", "2")) + ok(t, err) + _, err = redis.String(c.Do("XADD", "s", "*", "one", "11", "two", "22")) + ok(t, err) + + t.Run("XLEN", func(t *testing.T) { + count, err := redis.Int(c.Do("XLEN", "s")) + ok(t, err) + equals(t, 2, count) + + count, err = redis.Int(c.Do("XLEN", "s3")) + ok(t, err) + equals(t, 0, count) + }) + + t.Run("error cases", func(t *testing.T) { + // Wrong type of key + _, err := redis.String(c.Do("SET", "str", "value")) + ok(t, err) + + _, err = redis.Int(c.Do("XLEN")) + mustFail(t, err, errWrongNumber("xlen")) + + _, err = redis.Int(c.Do("XLEN", "str")) + mustFail(t, err, msgWrongType) + }) +} diff --git a/db.go b/db.go index 0fb8db87..4fb6c466 100644 --- a/db.go +++ b/db.go @@ -59,6 +59,8 @@ func (db *RedisDB) move(key string, to *RedisDB) bool { to.setKeys[key] = db.setKeys[key] case "zset": to.sortedsetKeys[key] = db.sortedsetKeys[key] + case "stream": + to.streamKeys[key] = db.streamKeys[key] default: panic("unhandled key type") } @@ -83,6 +85,8 @@ func (db *RedisDB) rename(from, to string) { db.setKeys[to] = db.setKeys[from] case "zset": db.sortedsetKeys[to] = db.sortedsetKeys[from] + case "stream": + db.streamKeys[to] = db.streamKeys[from] default: panic("missing case") } @@ -116,6 +120,8 @@ func (db *RedisDB) del(k string, delTTL bool) { delete(db.setKeys, k) case "zset": delete(db.sortedsetKeys, k) + case "stream": + delete(db.streamKeys, k) default: panic("Unknown key type: " + t) } @@ -536,6 +542,76 @@ func (db *RedisDB) setUnion(keys []string) (setKey, error) { return s, nil } +// stream set returns a stream as a slice. +func (db *RedisDB) stream(key string) []map[string]map[string]string { + stream := db.streamKeys[key] + if len(stream) == 0 { + return nil + } + + rawStream := make([]map[string]map[string]string, len(stream)) + for i, entry := range stream { + rawStream[i] = map[string]map[string]string{entry.id.String(): entry.values} + } + + return rawStream +} + +// streamAdd adds an entry to a stream. Returns entry ID. +// If entryID corresponds to the zero value, the ID will be generated automatically. +func (db *RedisDB) streamAdd(key string, entryID streamEntryID, values map[string]string) (string, error) { + stream, ok := db.streamKeys[key] + if !ok { + stream = newStream() + db.keys[key] = "stream" + } + + if entryID[0] == 0 && entryID[1] == 0 { + entryID = stream.nextEntryID() + } + + if err := stream.isValidNextEntryID(entryID); err != nil { + return "", err + } + + stream.append(entryID, values) + db.streamKeys[key] = stream + db.keyVersion[key]++ + + return entryID.String(), nil +} + +// streamForceInsert adds an entry to a stream, regardless of whether the provided entry would be the last. +// Returns entry ID. +func (db *RedisDB) streamForceAdd(key string, entryID streamEntryID, values map[string]string) (string, error) { + stream, ok := db.streamKeys[key] + if !ok { + return db.streamAdd(key, entryID, values) + } + + newStream := make(streamKey, 0, len(stream)+1) + + for i, entry := range stream { + if entryID.Less(entry.id) { + newStream = append(newStream, streamEntry{id: entryID, values: values}) + newStream = append(newStream, stream[i:]...) + + break + } + + newStream = append(newStream, entry) + + if i == len(stream)-1 { + newStream = append(newStream, streamEntry{id: entryID, values: values}) + } + } + + db.streamKeys[key] = newStream + db.keyVersion[key]++ + + return entryID.String(), nil +} + // fastForward proceeds the current timestamp with duration, works as a time machine func (db *RedisDB) fastForward(duration time.Duration) { for _, key := range db.allKeys() { diff --git a/direct.go b/direct.go index f0a179cb..bb041ad3 100644 --- a/direct.go +++ b/direct.go @@ -382,7 +382,6 @@ func (db *RedisDB) Unlink(k string) bool { return db.Del(k) } - // TTL is the left over time to live. As set via EXPIRE, PEXPIRE, EXPIREAT, // PEXPIREAT. // 0 if not set. @@ -652,6 +651,56 @@ func (db *RedisDB) ZScore(k, member string) (float64, error) { return db.ssetScore(k, member), nil } +// XAdd adds an entry to a stream. +func (m *Miniredis) XAdd(k string, id string, values map[string]string) (string, error) { + return m.DB(m.selectedDB).XAdd(k, id, values) +} + +// XAdd adds an entry to a stream. +// Any valid ID is accepted regardless of the current latest ID. +func (db *RedisDB) XAdd(k string, id string, values map[string]string) (string, error) { + db.master.Lock() + defer db.master.Unlock() + defer db.master.signal.Broadcast() + + if db.exists(k) && db.t(k) != "stream" { + return "", ErrWrongType + } + + var entryID streamEntryID + + if id == "*" { + return db.streamAdd(k, entryID, values) + } + + var err error + entryID, err = formatStreamEntryID(id) + if err != nil { + return "", err + } + + return db.streamForceAdd(k, entryID, values) +} + +// Stream returns a slice of stream entries id->values maps. +func (m *Miniredis) Stream(k string) ([]map[string]map[string]string, error) { + return m.DB(m.selectedDB).Stream(k) +} + +// Stream returns a slice of stream entries id->values maps. +func (db *RedisDB) Stream(k string) ([]map[string]map[string]string, error) { + db.master.Lock() + defer db.master.Unlock() + + if !db.exists(k) { + return nil, ErrKeyNotFound + } + if db.t(k) != "stream" { + return nil, ErrWrongType + } + return db.stream(k), nil +} + // Publish a message to subscribers. Returns the number of receivers. func (m *Miniredis) Publish(channel, message string) int { m.Lock() diff --git a/go.mod b/go.mod index e59b0ade..8a8dc65f 100644 --- a/go.mod +++ b/go.mod @@ -5,3 +5,5 @@ require ( github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3 github.com/yuin/gopher-lua v0.0.0-20190206043414-8bfc7677f583 ) + +go 1.13 diff --git a/integration/stream_test.go b/integration/stream_test.go new file mode 100644 index 00000000..8088a231 --- /dev/null +++ b/integration/stream_test.go @@ -0,0 +1,36 @@ +// +build int + +package main + +import ( + "testing" +) + +func TestStream(t *testing.T) { + testCommands(t, + succ("XADD", + "planets", + "0-1", + "name", "Mercury", + ), + succLoosely("XADD", + "planets", + "*", + "name", "Venus", + ), + succ("XADD", + "planets", + "18446744073709551000-0", + "name", "Earth", + ), + fail("XADD", + "planets", + "18446744073709551000-0", + "name", "Earth", + ), + succ("XLEN", "planets"), + succ("RENAME", "planets", "planets2"), + succ("DEL", "planets2"), + succ("XLEN", "planets"), + ) +} diff --git a/miniredis.go b/miniredis.go index caafa136..dc5b1323 100644 --- a/miniredis.go +++ b/miniredis.go @@ -41,6 +41,7 @@ type RedisDB struct { listKeys map[string]listKey // LPUSH &c. keys setKeys map[string]setKey // SADD &c. keys sortedsetKeys map[string]sortedSet // ZADD &c. keys + streamKeys map[string]streamKey // XADD &c. keys ttl map[string]time.Duration // effective TTL values keyVersion map[string]uint // used to watch values } @@ -99,6 +100,7 @@ func newRedisDB(id int, m *Miniredis) RedisDB { listKeys: map[string]listKey{}, setKeys: map[string]setKey{}, sortedsetKeys: map[string]sortedSet{}, + streamKeys: map[string]streamKey{}, ttl: map[string]time.Duration{}, keyVersion: map[string]uint{}, } @@ -145,6 +147,7 @@ func (m *Miniredis) start(s *server.Server) error { commandsPubsub(m) commandsSet(m) commandsSortedSet(m) + commandsStream(m) commandsTransaction(m) commandsScripting(m) commandsGeo(m) @@ -333,6 +336,13 @@ func (m *Miniredis) Dump() string { for _, el := range db.ssetElements(k) { r += fmt.Sprintf("%s%f: %s\n", indent, el.score, v(el.member)) } + case "stream": + for _, entry := range db.streamKeys[k] { + r += fmt.Sprintf("%s%s\n", indent, entry.id.String()) + for key, val := range entry.values { + r += fmt.Sprintf("%s%s%s: %s\n", indent, indent, v(key), v(val)) + } + } default: r += fmt.Sprintf("%s(a %s, fixme!)\n", indent, t) } diff --git a/miniredis_test.go b/miniredis_test.go index 674a7881..f8a610bf 100644 --- a/miniredis_test.go +++ b/miniredis_test.go @@ -1,6 +1,7 @@ package miniredis import ( + "strings" "testing" "time" @@ -171,6 +172,32 @@ func TestDumpSortedSet(t *testing.T) { } } +func TestDumpStream(t *testing.T) { + s, err := Run() + ok(t, err) + s.XAdd("elements", "123456789-0", map[string]string{"name": "wind"}) + s.XAdd("elements", "0-1", map[string]string{"name": "earth"}) + s.XAdd("elements", "123456789-1", map[string]string{"name": "fire"}) + if have, want := s.Dump(), `- elements + 0-1 + "name": "earth" + 123456789-0 + "name": "wind" + 123456789-1 + "name": "fire" +`; have != want { + t.Errorf("have: %q, want: %q", have, want) + } + + s.XAdd("elements", "*", map[string]string{"name": "Leeloo"}) + fullHave := s.Dump() + have := strings.Split(fullHave, "\n")[8] + want := ` "name": "Leeloo"` + if have != want { + t.Errorf("have: %q, want: %q", have, want) + } +} + func TestKeysAndFlush(t *testing.T) { s, err := Run() ok(t, err) diff --git a/stream.go b/stream.go new file mode 100644 index 00000000..e0e6514a --- /dev/null +++ b/stream.go @@ -0,0 +1,106 @@ +package miniredis + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +// Basic stream implementation. + +var ( + errInvalidStreamIDFormat = errors.New("ERR Invalid stream ID specified as stream command argument") +) + +type streamKey []streamEntry + +type streamEntry struct { + id streamEntryID + values map[string]string +} + +type streamEntryID [2]uint64 + +func (id *streamEntryID) Less(other streamEntryID) bool { + if other[0] > id[0] { + return true + } + + if other[0] == id[0] { + return other[1] > id[1] + } + + return false +} + +func (id *streamEntryID) String() string { + return fmt.Sprintf("%d-%d", id[0], id[1]) +} + +func newStream() streamKey { + return streamKey{} +} + +func (ss *streamKey) append(id streamEntryID, values map[string]string) { + *ss = append(*ss, streamEntry{id: id, values: values}) +} + +func (ss *streamKey) nextEntryID() streamEntryID { + curTime := uint64(time.Now().UnixNano() / int64(time.Millisecond)) + + lastID := ss.getLastEntryID() + + if lastID[0] < curTime { + return streamEntryID{curTime, 0} + } + + return streamEntryID{lastID[0], lastID[1] + 1} +} + +func (ss *streamKey) isValidNextEntryID(next streamEntryID) error { + last := ss.getLastEntryID() + + if !last.Less(next) { + return errors.New("ERR The ID specified in XADD is equal or smaller than the target stream top item") + } + + return nil +} + +func (ss *streamKey) getLastEntryID() streamEntryID { + // Return a zero value in case there is no entry + // Note that deleted entries will also need to be tracked + if len(*ss) == 0 { + return streamEntryID{} + } + + return (*ss)[len(*ss)-1].id +} + +func formatStreamEntryID(id string) (fmtid streamEntryID, err error) { + parts := strings.Split(id, "-") + if len(parts) != 1 && len(parts) != 2 { + return fmtid, errInvalidStreamIDFormat + } + + ts, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return fmtid, errInvalidStreamIDFormat + } + + var seq uint64 + if len(parts) == 2 { + seq, err = strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return fmtid, errInvalidStreamIDFormat + } + } + + if ts == 0 && seq == 0 { + return fmtid, errInvalidStreamIDFormat + } + + return streamEntryID{ts, seq}, nil +} From 8fe1bde06d9b56e57c60a77f8d16a1826dbf8de9 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Moal Date: Sun, 27 Oct 2019 01:28:20 +0200 Subject: [PATCH 2/4] Implement XRANGE/XREVRANGE --- README.md | 2 + cmd_stream.go | 133 ++++++++++++++++++++++++++++++++++++- cmd_stream_test.go | 82 ++++++++++++++++++++++- integration/stream_test.go | 56 ++++++++++++++++ stream.go | 42 ++++++++++++ 5 files changed, 312 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8dd1dca5..2302974d 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,8 @@ Implemented commands: - Stream keys - XADD - XLEN + - XRANGE + - XREVRANGE - Scripting - EVAL - EVALSHA diff --git a/cmd_stream.go b/cmd_stream.go index cc63a944..b7147f9e 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -3,6 +3,7 @@ package miniredis import ( + "strconv" "strings" "github.com/alicebob/miniredis/v2/server" @@ -11,9 +12,9 @@ import ( // commandsStream handles all stream operations. func commandsStream(m *Miniredis) { m.srv.Register("XADD", m.cmdXadd) - // XRANGE key start end [COUNT count] - // XREVRANGE key end start [COUNT count] m.srv.Register("XLEN", m.cmdXlen) + m.srv.Register("XRANGE", m.makeCmdXrange(false)) + m.srv.Register("XREVRANGE", m.makeCmdXrange(true)) } // XADD @@ -112,3 +113,131 @@ func (m *Miniredis) cmdXlen(c *server.Peer, cmd string, args []string) { c.WriteInt(len(db.streamKeys[key])) }) } + +// XRANGE and XREVRANGE +func (m *Miniredis) makeCmdXrange(reverse bool) server.Cmd { + return func(c *server.Peer, cmd string, args []string) { + if len(args) < 3 { + setDirty(c) + c.WriteError(errWrongNumber(cmd)) + return + } + if len(args) == 4 || len(args) > 5 { + setDirty(c) + c.WriteError(msgSyntaxError) + return + } + if !m.handleAuth(c) { + return + } + if m.checkPubsub(c) { + return + } + + key := args[0] + + var start streamEntryID + start, err := formatStreamRangeBound(args[1], true, reverse) + if err != nil { + setDirty(c) + c.WriteError(err.Error()) + return + } + var end streamEntryID + end, err = formatStreamRangeBound(args[2], false, reverse) + if err != nil { + setDirty(c) + c.WriteError(err.Error()) + return + } + + count := 0 + if len(args) == 5 { + if strings.ToLower(args[3]) != "count" { + setDirty(c) + c.WriteError(msgSyntaxError) + return + } + + count, err = strconv.Atoi(args[4]) + if err != nil { + setDirty(c) + c.WriteError(msgInvalidInt) + return + } + + if count == 0 { + c.WriteLen(0) + return + } + } + + withTx(m, c, func(c *server.Peer, ctx *connCtx) { + db := m.db(ctx.selectedDB) + + if !db.exists(key) { + c.WriteLen(0) + return + } + + if db.t(key) != "stream" { + c.WriteError(ErrWrongType.Error()) + return + } + + var entries []streamEntry = db.streamKeys[key] + if reverse { + entries = reversedStreamEntries(entries) + } + + if count == 0 { + count = len(entries) + } + + returnedEntries := make([]streamEntry, 0, count) + returnedItemsCount := 0 + + for _, entry := range entries { + if len(returnedEntries) == count { + break + } + + if !reverse { + // Break if entry ID > end + if end.Less(entry.id) { + break + } + + // Continue if entry ID < start + if entry.id.Less(start) { + continue + } + } else { + // Break if entry iD < end + if entry.id.Less(end) { + break + } + + // Continue if entry ID > start. + if start.Less(entry.id) { + continue + } + } + + returnedEntries = append(returnedEntries, entry) + returnedItemsCount += 1 + len(entry.values) + } + + c.WriteLen(len(returnedEntries)) + for _, entry := range returnedEntries { + c.WriteLen(2) + c.WriteBulk(entry.id.String()) + c.WriteLen(2 * len(entry.values)) + for k, v := range entry.values { + c.WriteBulk(k) + c.WriteBulk(v) + } + } + }) + } +} diff --git a/cmd_stream_test.go b/cmd_stream_test.go index be879628..0283ce53 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -9,7 +9,7 @@ import ( "github.com/gomodule/redigo/redis" ) -// Test XADD / XLEN +// Test XADD / XLEN / XRANGE func TestStream(t *testing.T) { s, err := Run() ok(t, err) @@ -156,3 +156,83 @@ func TestStreamLen(t *testing.T) { mustFail(t, err, msgWrongType) }) } + +// Test XRANGE / XREVRANGE +func TestStreamRange(t *testing.T) { + s, err := Run() + ok(t, err) + defer s.Close() + c, err := redis.Dial("tcp", s.Addr()) + ok(t, err) + defer c.Close() + + _, err = redis.String(c.Do("XADD", "planets", "0-1", "name", "Mercury")) + ok(t, err) + _, err = redis.String(c.Do("XADD", "planets", "1-0", "name", "Venus")) + ok(t, err) + _, err = redis.String(c.Do("XADD", "planets", "2-1", "name", "Earth")) + ok(t, err) + _, err = redis.String(c.Do("XADD", "planets", "3-0", "name", "Mars")) + ok(t, err) + _, err = redis.String(c.Do("XADD", "planets", "4-1", "name", "Jupiter")) + ok(t, err) + + t.Run("XRANGE", func(t *testing.T) { + res, err := redis.Values(c.Do("XRANGE", "planets", "1", "+")) + ok(t, err) + equals(t, 4, len(res)) + + item := res[1].([]interface{}) + id := string(item[0].([]byte)) + + vals := item[1].([]interface{}) + field := string(vals[0].([]byte)) + value := string(vals[1].([]byte)) + + equals(t, "2-1", id) + equals(t, "name", field) + equals(t, "Earth", value) + + res, err = redis.Values(c.Do("XREVRANGE", "planets", "3", "1")) + ok(t, err) + equals(t, 3, len(res)) + + item = res[2].([]interface{}) + id = string(item[0].([]byte)) + + vals = item[1].([]interface{}) + field = string(vals[0].([]byte)) + value = string(vals[1].([]byte)) + + equals(t, "1-0", id) + equals(t, "name", field) + equals(t, "Venus", value) + }) + + t.Run("error cases", func(t *testing.T) { + // Wrong type of key + _, err := redis.String(c.Do("SET", "str", "value")) + ok(t, err) + + _, err = redis.Int(c.Do("XRANGE", "str", "-", "+")) + mustFail(t, err, msgWrongType) + + _, err = redis.Int(c.Do("XRANGE", "str", "-", "+")) + mustFail(t, err, msgWrongType) + + _, err = redis.Int(c.Do("XRANGE")) + mustFail(t, err, errWrongNumber("xrange")) + _, err = redis.Int(c.Do("XRANGE", "foo")) + mustFail(t, err, errWrongNumber("xrange")) + _, err = redis.Int(c.Do("XRANGE", "foo", 1)) + mustFail(t, err, errWrongNumber("xrange")) + _, err = redis.Int(c.Do("XRANGE", "foo", 2, 3, "toomany")) + mustFail(t, err, msgSyntaxError) + _, err = redis.Int(c.Do("XRANGE", "foo", 2, 3, "COUNT", "noint")) + mustFail(t, err, msgInvalidInt) + _, err = redis.Int(c.Do("XRANGE", "foo", 2, 3, "COUNT", 1, "toomany")) + mustFail(t, err, msgSyntaxError) + _, err = redis.Int(c.Do("XRANGE", "foo", "-", "noint")) + mustFail(t, err, errInvalidStreamIDFormat.Error()) + }) +} diff --git a/integration/stream_test.go b/integration/stream_test.go index 8088a231..b028342a 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -34,3 +34,59 @@ func TestStream(t *testing.T) { succ("XLEN", "planets"), ) } + +func TestStreamRange(t *testing.T) { + testCommands(t, + succ("XADD", + "ordplanets", + "0-1", + "name", "Mercury", + ), + succ("XADD", + "ordplanets", + "1-0", + "name", "Venus", + ), + succ("XADD", + "ordplanets", + "2-1", + "name", "Earth", + ), + succ("XADD", + "ordplanets", + "3-0", + "name", "Mars", + ), + succ("XADD", + "ordplanets", + "4-1", + "name", "Jupiter", + ), + succ("XRANGE", "ordplanets", "-", "+"), + succ("XRANGE", "ordplanets", "+", "-"), + succ("XRANGE", "ordplanets", "-", "99"), + succ("XRANGE", "ordplanets", "0", "4"), + succ("XRANGE", "ordplanets", "0", "1-0"), + succ("XRANGE", "ordplanets", "0", "1-99"), + succ("XRANGE", "ordplanets", "0", "2", "COUNT", "1"), + succ("XRANGE", "ordplanets", "1-42", "3-42", "COUNT", "1"), + + succ("XREVRANGE", "ordplanets", "+", "-"), + succ("XREVRANGE", "ordplanets", "-", "+"), + succ("XREVRANGE", "ordplanets", "4", "0"), + succ("XREVRANGE", "ordplanets", "1-0", "0"), + succ("XREVRANGE", "ordplanets", "3-42", "1-0", "COUNT", "2"), + succ("DEL", "ordplanets"), + + // // failure cases + fail("XRANGE"), + fail("XRANGE", "foo"), + fail("XRANGE", "foo", 1), + fail("XRANGE", "foo", 2, 3, "toomany"), + fail("XRANGE", "foo", 2, 3, "COUNT", "noint"), + fail("XRANGE", "foo", 2, 3, "COUNT", 1, "toomany"), + fail("XRANGE", "foo", "-", "noint"), + succ("SET", "str", "I am a string"), + fail("XRANGE", "str", "-", "+"), + ) +} diff --git a/stream.go b/stream.go index e0e6514a..92e044ae 100644 --- a/stream.go +++ b/stream.go @@ -3,6 +3,7 @@ package miniredis import ( "errors" "fmt" + "math" "strconv" "strings" "time" @@ -104,3 +105,44 @@ func formatStreamEntryID(id string) (fmtid streamEntryID, err error) { return streamEntryID{ts, seq}, nil } + +func formatStreamRangeBound(id string, start bool, reverse bool) (fmtid streamEntryID, err error) { + if id == "-" { + return streamEntryID{0, 0}, nil + } + + if id == "+" { + return streamEntryID{math.MaxUint64, math.MaxUint64}, nil + } + + if id == "0" { + return streamEntryID{0, 0}, nil + } + + parts := strings.Split(id, "-") + if len(parts) == 2 { + return formatStreamEntryID(id) + } + + // Incomplete IDs case + ts, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return fmtid, errInvalidStreamIDFormat + } + + if (!start && !reverse) || (start && reverse) { + return streamEntryID{ts, math.MaxUint64}, nil + } + + return streamEntryID{ts, 0}, nil +} + +func reversedStreamEntries(o []streamEntry) []streamEntry { + newStream := make([]streamEntry, len(o)) + + for i, e := range o { + newStream[len(o)-i-1] = e + } + + return newStream +} From 721aeeabd3cff68d3a8e5b6744c3392b8ff2e118 Mon Sep 17 00:00:00 2001 From: Jean-Philippe Moal Date: Sun, 27 Oct 2019 13:31:47 +0100 Subject: [PATCH 3/4] Change stream values data type to guaranteed insertion order --- cmd_stream.go | 10 +++++----- cmd_stream_test.go | 22 +++++++++++----------- db.go | 10 +++++----- direct.go | 8 ++++---- integration/stream_test.go | 5 +++++ miniredis.go | 4 ++-- miniredis_test.go | 8 ++++---- stream.go | 4 ++-- 8 files changed, 38 insertions(+), 33 deletions(-) diff --git a/cmd_stream.go b/cmd_stream.go index b7147f9e..983e3432 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -56,9 +56,9 @@ func (m *Miniredis) cmdXadd(c *server.Peer, cmd string, args []string) { return } - entryDict := make(map[string]string) + entryDict := make([][2]string, 0, len(args)/2) for len(args) > 0 { - entryDict[args[0]] = args[1] + entryDict = append(entryDict, [2]string{args[0], args[1]}) args = args[2:] } @@ -233,9 +233,9 @@ func (m *Miniredis) makeCmdXrange(reverse bool) server.Cmd { c.WriteLen(2) c.WriteBulk(entry.id.String()) c.WriteLen(2 * len(entry.values)) - for k, v := range entry.values { - c.WriteBulk(k) - c.WriteBulk(v) + for _, kv := range entry.values { + c.WriteBulk(kv[0]) + c.WriteBulk(kv[1]) } } }) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 0283ce53..39ff36bb 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -33,17 +33,17 @@ func TestStream(t *testing.T) { }) t.Run("direct usage", func(t *testing.T) { - _, err := s.XAdd("s1", "0-0", map[string]string{"name": "foo"}) + _, err := s.XAdd("s1", "0-0", [][2]string{{"name", "foo"}}) assert(t, err != nil, "XAdd error") - id, err := s.XAdd("s1", "12345-67", map[string]string{"name": "bar"}) + id, err := s.XAdd("s1", "12345-67", [][2]string{{"name", "bar"}}) ok(t, err) equals(t, "12345-67", id) - id, err = s.XAdd("s1", "12345-0", map[string]string{"name": "foo"}) + id, err = s.XAdd("s1", "12345-0", [][2]string{{"name", "foo"}}) ok(t, err) - id, err = s.XAdd("s1", "*", map[string]string{"name": "baz"}) + id, err = s.XAdd("s1", "*", [][2]string{{"name", "baz"}}) ok(t, err) exp := `\d+-0` matched, err := regexp.MatchString(exp, id) @@ -53,7 +53,7 @@ func TestStream(t *testing.T) { stream, err := s.Stream("s1") ok(t, err) equals(t, 3, len(stream)) - equals(t, map[string]string{"name": "bar"}, stream[1]["12345-67"]) + equals(t, [][2]string{{"name", "bar"}}, stream[1]["12345-67"]) }) } @@ -93,7 +93,7 @@ func TestStreamAdd(t *testing.T) { _, err := redis.String(c.Do("SET", "str", "value")) ok(t, err) - _, err = s.XAdd("str", "*", map[string]string{"hi": "1"}) + _, err = s.XAdd("str", "*", [][2]string{{"hi", "1"}}) mustFail(t, err, msgWrongType) _, err = redis.String(c.Do("XADD", "str", "*", "hi", "1")) @@ -166,15 +166,15 @@ func TestStreamRange(t *testing.T) { ok(t, err) defer c.Close() - _, err = redis.String(c.Do("XADD", "planets", "0-1", "name", "Mercury")) + _, err = redis.String(c.Do("XADD", "planets", "0-1", "name", "Mercury", "greek-god", "Hermes", "idx", "1")) ok(t, err) - _, err = redis.String(c.Do("XADD", "planets", "1-0", "name", "Venus")) + _, err = redis.String(c.Do("XADD", "planets", "1-0", "name", "Venus", "greek-god", "Aphrodite", "idx", "2")) ok(t, err) - _, err = redis.String(c.Do("XADD", "planets", "2-1", "name", "Earth")) + _, err = redis.String(c.Do("XADD", "planets", "2-1", "name", "Earth", "greek-god", "", "idx", "3")) ok(t, err) - _, err = redis.String(c.Do("XADD", "planets", "3-0", "name", "Mars")) + _, err = redis.String(c.Do("XADD", "planets", "3-0", "greek-god", "Ares", "name", "Mars", "idx", "4")) ok(t, err) - _, err = redis.String(c.Do("XADD", "planets", "4-1", "name", "Jupiter")) + _, err = redis.String(c.Do("XADD", "planets", "4-1", "name", "Jupiter", "greek-god", "Dias", "idx", "5")) ok(t, err) t.Run("XRANGE", func(t *testing.T) { diff --git a/db.go b/db.go index 4fb6c466..a10cee0b 100644 --- a/db.go +++ b/db.go @@ -543,15 +543,15 @@ func (db *RedisDB) setUnion(keys []string) (setKey, error) { } // stream set returns a stream as a slice. -func (db *RedisDB) stream(key string) []map[string]map[string]string { +func (db *RedisDB) stream(key string) []map[string][][2]string { stream := db.streamKeys[key] if len(stream) == 0 { return nil } - rawStream := make([]map[string]map[string]string, len(stream)) + rawStream := make([]map[string][][2]string, len(stream)) for i, entry := range stream { - rawStream[i] = map[string]map[string]string{entry.id.String(): entry.values} + rawStream[i] = map[string][][2]string{entry.id.String(): entry.values} } return rawStream @@ -559,7 +559,7 @@ func (db *RedisDB) stream(key string) []map[string]map[string]string { // streamAdd adds an entry to a stream. Returns entry ID. // If entryID corresponds to the zero value, the ID will be generated automatically. -func (db *RedisDB) streamAdd(key string, entryID streamEntryID, values map[string]string) (string, error) { +func (db *RedisDB) streamAdd(key string, entryID streamEntryID, values [][2]string) (string, error) { stream, ok := db.streamKeys[key] if !ok { stream = newStream() @@ -583,7 +583,7 @@ func (db *RedisDB) streamAdd(key string, entryID streamEntryID, values map[strin // streamForceInsert adds an entry to a stream, regardless of whether the provided entry would be the last. // Returns entry ID. -func (db *RedisDB) streamForceAdd(key string, entryID streamEntryID, values map[string]string) (string, error) { +func (db *RedisDB) streamForceAdd(key string, entryID streamEntryID, values [][2]string) (string, error) { stream, ok := db.streamKeys[key] if !ok { return db.streamAdd(key, entryID, values) diff --git a/direct.go b/direct.go index bb041ad3..f4134bbb 100644 --- a/direct.go +++ b/direct.go @@ -652,13 +652,13 @@ func (db *RedisDB) ZScore(k, member string) (float64, error) { } // XAdd adds an entry to a stream. -func (m *Miniredis) XAdd(k string, id string, values map[string]string) (string, error) { +func (m *Miniredis) XAdd(k string, id string, values [][2]string) (string, error) { return m.DB(m.selectedDB).XAdd(k, id, values) } // XAdd adds an entry to a stream. // Any valid ID is accepted regardless of the current latest ID. -func (db *RedisDB) XAdd(k string, id string, values map[string]string) (string, error) { +func (db *RedisDB) XAdd(k string, id string, values [][2]string) (string, error) { db.master.Lock() defer db.master.Unlock() defer db.master.signal.Broadcast() @@ -683,12 +683,12 @@ func (db *RedisDB) XAdd(k string, id string, values map[string]string) (string, } // Stream returns a slice of stream entries id->values maps. -func (m *Miniredis) Stream(k string) ([]map[string]map[string]string, error) { +func (m *Miniredis) Stream(k string) ([]map[string][][2]string, error) { return m.DB(m.selectedDB).Stream(k) } // Stream returns a slice of stream entries id->values maps. -func (db *RedisDB) Stream(k string) ([]map[string]map[string]string, error) { +func (db *RedisDB) Stream(k string) ([]map[string][][2]string, error) { db.master.Lock() defer db.master.Unlock() diff --git a/integration/stream_test.go b/integration/stream_test.go index b028342a..e850e48d 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -41,25 +41,30 @@ func TestStreamRange(t *testing.T) { "ordplanets", "0-1", "name", "Mercury", + "greek-god", "Hermes", ), succ("XADD", "ordplanets", "1-0", "name", "Venus", + "greek-god", "Aphrodite", ), succ("XADD", "ordplanets", "2-1", + "greek-god", "", "name", "Earth", ), succ("XADD", "ordplanets", "3-0", "name", "Mars", + "greek-god", "Ares", ), succ("XADD", "ordplanets", "4-1", + "greek-god", "Dias", "name", "Jupiter", ), succ("XRANGE", "ordplanets", "-", "+"), diff --git a/miniredis.go b/miniredis.go index dc5b1323..eac8c6fe 100644 --- a/miniredis.go +++ b/miniredis.go @@ -339,8 +339,8 @@ func (m *Miniredis) Dump() string { case "stream": for _, entry := range db.streamKeys[k] { r += fmt.Sprintf("%s%s\n", indent, entry.id.String()) - for key, val := range entry.values { - r += fmt.Sprintf("%s%s%s: %s\n", indent, indent, v(key), v(val)) + for _, kv := range entry.values { + r += fmt.Sprintf("%s%s%s: %s\n", indent, indent, v(kv[0]), v(kv[1])) } } default: diff --git a/miniredis_test.go b/miniredis_test.go index f8a610bf..0e9df041 100644 --- a/miniredis_test.go +++ b/miniredis_test.go @@ -175,9 +175,9 @@ func TestDumpSortedSet(t *testing.T) { func TestDumpStream(t *testing.T) { s, err := Run() ok(t, err) - s.XAdd("elements", "123456789-0", map[string]string{"name": "wind"}) - s.XAdd("elements", "0-1", map[string]string{"name": "earth"}) - s.XAdd("elements", "123456789-1", map[string]string{"name": "fire"}) + s.XAdd("elements", "123456789-0", [][2]string{{"name", "wind"}}) + s.XAdd("elements", "0-1", [][2]string{{"name", "earth"}}) + s.XAdd("elements", "123456789-1", [][2]string{{"name", "fire"}}) if have, want := s.Dump(), `- elements 0-1 "name": "earth" @@ -189,7 +189,7 @@ func TestDumpStream(t *testing.T) { t.Errorf("have: %q, want: %q", have, want) } - s.XAdd("elements", "*", map[string]string{"name": "Leeloo"}) + s.XAdd("elements", "*", [][2]string{{"name", "Leeloo"}}) fullHave := s.Dump() have := strings.Split(fullHave, "\n")[8] want := ` "name": "Leeloo"` diff --git a/stream.go b/stream.go index 92e044ae..155e6b96 100644 --- a/stream.go +++ b/stream.go @@ -19,7 +19,7 @@ type streamKey []streamEntry type streamEntry struct { id streamEntryID - values map[string]string + values [][2]string } type streamEntryID [2]uint64 @@ -44,7 +44,7 @@ func newStream() streamKey { return streamKey{} } -func (ss *streamKey) append(id streamEntryID, values map[string]string) { +func (ss *streamKey) append(id streamEntryID, values [][2]string) { *ss = append(*ss, streamEntry{id: id, values: values}) } From f467a9d35f0b0335f539ec4c854e9b9203370c34 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 30 Oct 2019 17:03:27 +0100 Subject: [PATCH 4/4] use time set by SetTime() for stream IDs, if any is set This way the IDs themselves are predictable and testable. --- cmd_generic.go | 5 +---- cmd_server.go | 6 +----- cmd_stream_test.go | 13 +++++++++++++ db.go | 2 +- miniredis.go | 13 ++++++++++--- stream.go | 4 ++-- 6 files changed, 28 insertions(+), 15 deletions(-) diff --git a/cmd_generic.go b/cmd_generic.go index b0744f15..63197e70 100644 --- a/cmd_generic.go +++ b/cmd_generic.go @@ -80,10 +80,7 @@ func makeCmdExpire(m *Miniredis, unix bool, d time.Duration) func(*server.Peer, default: panic("invalid time unit (d). Fixme!") } - now := m.now - if now.IsZero() { - now = time.Now().UTC() - } + now := m.effectiveNow() db.ttl[key] = ts.Sub(now) } else { db.ttl[key] = time.Duration(i) * d diff --git a/cmd_server.go b/cmd_server.go index 2f4b7156..10e73980 100644 --- a/cmd_server.go +++ b/cmd_server.go @@ -5,7 +5,6 @@ package miniredis import ( "strconv" "strings" - "time" "github.com/alicebob/miniredis/v2/server" ) @@ -99,10 +98,7 @@ func (m *Miniredis) cmdTime(c *server.Peer, cmd string, args []string) { } withTx(m, c, func(c *server.Peer, ctx *connCtx) { - now := m.now - if now.IsZero() { - now = time.Now() - } + now := m.effectiveNow() nanos := now.UnixNano() seconds := nanos / 1000000000 microseconds := (nanos / 1000) % 1000000 diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 39ff36bb..bd6724de 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -5,6 +5,7 @@ import ( "math" "regexp" "testing" + "time" "github.com/gomodule/redigo/redis" ) @@ -88,6 +89,18 @@ func TestStreamAdd(t *testing.T) { equals(t, fmt.Sprintf("%d-1", uint64(math.MaxUint64-100)), res) }) + t.Run("XADD SetTime", func(t *testing.T) { + now := time.Date(2001, 1, 1, 4, 4, 5, 4000, time.UTC) + s.SetTime(now) + id, err := redis.String(c.Do("XADD", "now", "*", "one", "1")) + ok(t, err) + equals(t, "978321845000004-0", id) + + id, err = redis.String(c.Do("XADD", "now", "*", "two", "2")) + ok(t, err) + equals(t, "978321845000004-1", id) + }) + t.Run("error cases", func(t *testing.T) { // Wrong type of key _, err := redis.String(c.Do("SET", "str", "value")) diff --git a/db.go b/db.go index a10cee0b..1139af26 100644 --- a/db.go +++ b/db.go @@ -567,7 +567,7 @@ func (db *RedisDB) streamAdd(key string, entryID streamEntryID, values [][2]stri } if entryID[0] == 0 && entryID[1] == 0 { - entryID = stream.nextEntryID() + entryID = stream.nextEntryID(db.master.effectiveNow()) } if err := stream.isValidNextEntryID(entryID); err != nil { diff --git a/miniredis.go b/miniredis.go index eac8c6fe..ee380d38 100644 --- a/miniredis.go +++ b/miniredis.go @@ -56,7 +56,7 @@ type Miniredis struct { selectedDB int // DB id used in the direct Get(), Set() &c. scripts map[string]string // sha1 -> lua src signal *sync.Cond - now time.Time // used to make a duration from EXPIREAT. time.Now() if not set. + now time.Time // time.Now() if not set. subscribers map[*Subscriber]struct{} rand *rand.Rand } @@ -350,8 +350,8 @@ func (m *Miniredis) Dump() string { return r } -// SetTime sets the time against which EXPIREAT values are compared. EXPIREAT -// will use time.Now() if this is not set. +// SetTime sets the time against which EXPIREAT values are compared, and the +// time used in stream entry IDs. Will use time.Now() if this is not set. func (m *Miniredis) SetTime(t time.Time) { m.Lock() defer m.Unlock() @@ -537,3 +537,10 @@ func (m *Miniredis) shuffle(l []string) { l[i], l[j] = l[j], l[i] } } + +func (m *Miniredis) effectiveNow() time.Time { + if !m.now.IsZero() { + return m.now + } + return time.Now().UTC() +} diff --git a/stream.go b/stream.go index 155e6b96..dc54b125 100644 --- a/stream.go +++ b/stream.go @@ -48,8 +48,8 @@ func (ss *streamKey) append(id streamEntryID, values [][2]string) { *ss = append(*ss, streamEntry{id: id, values: values}) } -func (ss *streamKey) nextEntryID() streamEntryID { - curTime := uint64(time.Now().UnixNano() / int64(time.Millisecond)) +func (ss *streamKey) nextEntryID(now time.Time) streamEntryID { + curTime := uint64(now.UnixNano()) / 1000 lastID := ss.getLastEntryID()