Skip to content

Commit

Permalink
feat: extend the CacheStore interface to have a chance to cleanup on …
Browse files Browse the repository at this point in the history
…Update

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Feb 12, 2025
1 parent 4fe09da commit 5c44807
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 48 deletions.
8 changes: 4 additions & 4 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type CacheStore interface {
// Update is called when receiving the response of the request sent by the above Flight Case 1 from redis.
// It should not only update the store but also deliver the response to all CacheEntry.Wait and return a desired client side PXAT of the response.
// Note that the server side expire time can be retrieved from RedisMessage.CachePXAT.
Update(key, cmd string, val RedisMessage) (pxat int64)
Update(key, cmd string, val RedisMessage, now time.Time) (pxat int64)
// Cancel is called when the request sent by the above Flight Case 1 failed.
// It should not only deliver the error to all CacheEntry.Wait but also remove the CacheEntry from the store.
Cancel(key, cmd string, err error)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (a *adapter) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red
return RedisMessage{}, flight
}

func (a *adapter) Update(key, cmd string, val RedisMessage) (sxat int64) {
func (a *adapter) Update(key, cmd string, val RedisMessage, _ time.Time) (sxat int64) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
Expand Down Expand Up @@ -219,15 +219,15 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red
return RedisMessage{}, nil
}

func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) {
func (f *flatten) Update(key, cmd string, val RedisMessage, now time.Time) (sxat int64) {
if af, ok := f.flights.Find(key, cmd); ok {
sxat = val.getExpireAt()
if af.xat < sxat || sxat == 0 {
sxat = af.xat
val.setExpireAt(sxat)
}
bs := val.CacheMarshal(nil)
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, bs)
f.cache.Insert(key, cmd, int64(len(bs)+len(key)+len(cmd))+int64(cache.LRUEntrySize)+64, sxat, now.UnixMilli(), bs)
f.flights.Delete(key, cmd)
af.setVal(val)
}
Expand Down
8 changes: 4 additions & 4 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func test(t *testing.T, storeFn func() CacheStore) {

v = RedisMessage{typ: '+', string: "val"}
v.setExpireAt(now.Add(time.Second).UnixMilli())
if pttl := store.Update("key", "cmd", v); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() {
if pttl := store.Update("key", "cmd", v, now); pttl < now.Add(90*time.Millisecond).UnixMilli() || pttl > now.Add(100*time.Millisecond).UnixMilli() {
t.Fatal("Update should return a desired pttl")
}

Expand Down Expand Up @@ -104,8 +104,8 @@ func test(t *testing.T, storeFn func() CacheStore) {
} {
store.Flight("key", "cmd1", time.Millisecond*100, now)
store.Flight("key", "cmd2", time.Millisecond*100, now)
store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"})
store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"})
store.Update("key", "cmd1", RedisMessage{typ: '+', string: "val"}, now)
store.Update("key", "cmd2", RedisMessage{typ: '+', string: "val"}, now)

store.Delete(deletions)

Expand All @@ -130,7 +130,7 @@ func test(t *testing.T, storeFn func() CacheStore) {

v = RedisMessage{typ: '+', string: "val"}
v.setExpireAt(now.Add(time.Millisecond).UnixMilli())
store.Update("key", "cmd", v)
store.Update("key", "cmd", v, now)

v, e = store.Flight("key", "cmd", time.Second, now.Add(time.Millisecond))
if v.typ != 0 || e != nil {
Expand Down
8 changes: 6 additions & 2 deletions internal/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (m *LRUDoubleMap[V]) move(h *linked[V]) {
}
}

func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) {
func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts, now int64, v V) {
// TODO: a RLock fast path?
m.mu.Lock()
if m.ma == nil {
Expand All @@ -117,7 +117,7 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) {
m.total += size
for m.head != nil {
h := (*linked[V])(m.head)
if m.total <= m.limit && h.ts != 0 { // TODO: clear expired entries?
if m.total <= m.limit && h.ts != 0 && h.ts > now {
break
}
m.remove(h)
Expand All @@ -127,6 +127,10 @@ func (m *LRUDoubleMap[V]) Insert(key1, key2 string, size, ts int64, v V) {
if h == nil {
h = &linked[V]{key: key1, ts: ts, mark: m.mark}
m.ma[key1] = h
} else if h.ts <= now {
m.total -= h.size
h.size = 0
h.head = chain[V]{}
}
h.ts = ts
h.size += size
Expand Down
26 changes: 13 additions & 13 deletions internal/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ func TestLRUDoubleMap(t *testing.T) {
if _, ok := m.Find("1", "a", 1); ok {
t.Fatal("should not find 1")
}
m.Insert("1", "a", 1, 2, 1)
m.Insert("1", "b", 1, 2, 2)
m.Insert("2", "c", 1, 2, 3)
m.Insert("1", "a", 1, 2, 1, 1)
m.Insert("1", "b", 1, 2, 1, 2)
m.Insert("2", "c", 1, 2, 1, 3)
if v, ok := m.Find("1", "a", 1); !ok || v != 1 {
t.Fatal("not find 1")
}
Expand Down Expand Up @@ -50,9 +50,9 @@ func TestLRUDoubleMap(t *testing.T) {
t.Fatal("should have 2 heads")
}

m.Insert("1", "d", 1, 2, 1)
m.Insert("1", "e", 1, 2, 2)
m.Insert("2", "f", 1, 2, 3)
m.Insert("1", "d", 1, 2, 1, 1)
m.Insert("1", "e", 1, 2, 1, 2)
m.Insert("2", "f", 1, 2, 1, 3)
if v, ok := m.Find("1", "d", 1); !ok || v != 1 {
t.Fatal("not find 1")
}
Expand All @@ -77,7 +77,7 @@ func TestLRUDoubleMap(t *testing.T) {
func TestLRUCache_LRU_1(t *testing.T) {
m := NewLRUDoubleMap[int](bpsize, bpsize)
for i := 0; i < bpsize; i++ {
m.Insert(strconv.Itoa(i), "a", 2, 2, i)
m.Insert(strconv.Itoa(i), "a", 2, 2, 1, i)
}
m.mu.Lock()
heads := len(m.ma)
Expand All @@ -100,7 +100,7 @@ func TestLRUCache_LRU_1(t *testing.T) {
func TestLRUCache_LRU_2(t *testing.T) {
m := NewLRUDoubleMap[int](bpsize*2, bpsize*2)
for i := 0; i < bpsize*2; i++ {
m.Insert(strconv.Itoa(i), "a", 1, 2, i)
m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i)
}
m.mu.Lock()
heads := len(m.ma)
Expand All @@ -116,7 +116,7 @@ func TestLRUCache_LRU_2(t *testing.T) {
runtime.GC()
runtime.GC()
for i := bpsize * 2; i < bpsize*3; i++ {
m.Insert(strconv.Itoa(i), "a", 1, 2, i)
m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i)
}
for i := 0; i < bpsize; i++ {
if v, ok := m.Find(strconv.Itoa(i), "a", 1); !ok || v != i {
Expand All @@ -138,14 +138,14 @@ func TestLRUCache_LRU_2(t *testing.T) {
func TestLRUCache_LRU_GC(t *testing.T) {
m := NewLRUDoubleMap[int](bpsize, bpsize)
for i := 0; i < bpsize; i++ {
m.Insert(strconv.Itoa(i), "a", 1, 2, i)
m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i)
}
if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 {
t.Fatal("not find")
}
runtime.GC()
runtime.GC()
m.Insert("a", "a", bpsize-1, 2, 0)
m.Insert("a", "a", bpsize-1, 2, 1, 0)
m.mu.Lock()
heads := len(m.ma)
total := m.total
Expand All @@ -172,15 +172,15 @@ func TestLRUCache_LRU_GC(t *testing.T) {
func TestLRUCache_LRU_GC_2(t *testing.T) {
m := NewLRUDoubleMap[int](bpsize, bpsize)
for i := 0; i < bpsize; i++ {
m.Insert(strconv.Itoa(i), "a", 1, 2, i)
m.Insert(strconv.Itoa(i), "a", 1, 2, 1, i)
}
if v, ok := m.Find(strconv.Itoa(bpsize/2), "a", 1); !ok || v != bpsize/2 {
t.Fatal("not find")
}
m.Reset()
runtime.GC()
runtime.GC()
m.Insert("a", "a", bpsize-1, 2, 0)
m.Insert("a", "a", bpsize-1, 2, 1, 0)
m.mu.Lock()
heads := len(m.ma)
total := m.total
Expand Down
2 changes: 1 addition & 1 deletion lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *lru) Flights(now time.Time, multi []CacheableTTL, results []RedisResult
return missed[:j]
}

func (c *lru) Update(key, cmd string, value RedisMessage) (pxat int64) {
func (c *lru) Update(key, cmd string, value RedisMessage, _ time.Time) (pxat int64) {
var ch chan struct{}
c.mu.Lock()
if kc, ok := c.store[key]; ok {
Expand Down
40 changes: 20 additions & 20 deletions lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestLRU(t *testing.T) {
}
m := RedisMessage{typ: '+', string: "0", values: []RedisMessage{{}}}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
store.Update("0", "GET", m)
store.Update("0", "GET", m, time.Now())
return store.(*lru)
}

Expand All @@ -49,7 +49,7 @@ func TestLRU(t *testing.T) {
t.Fatalf("got unexpected value from the Flight after pttl: %v %v", v, entry)
}
m := RedisMessage{typ: '+', string: "1"}
lru.Update("1", "GET", m)
lru.Update("1", "GET", m, time.Now())
if v, _ := lru.Flight("1", "GET", TTL, time.Now()); v.typ == 0 {
t.Fatalf("did not get the value from the second Flight")
} else if v.string != "1" {
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestLRU(t *testing.T) {
lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now())
m := RedisMessage{typ: '+', string: strconv.Itoa(i)}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
lru.Update(strconv.Itoa(i), "GET", m)
lru.Update(strconv.Itoa(i), "GET", m, time.Now())
}
if v, entry := lru.Flight("1", "GET", TTL, time.Now()); v.typ != 0 {
t.Fatalf("got evicted value from the first Flight: %v %v", v, entry)
Expand All @@ -123,7 +123,7 @@ func TestLRU(t *testing.T) {
for i := 1; i < Entries; i++ {
lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now())
m := RedisMessage{typ: '+', string: strconv.Itoa(i)}
lru.Update(strconv.Itoa(i), "GET", m)
lru.Update(strconv.Itoa(i), "GET", m, time.Now())
}
for i := 1; i < Entries; i++ {
if v, _ := lru.Flight(strconv.Itoa(i), "GET", TTL, time.Now()); v.string != strconv.Itoa(i) {
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestLRU(t *testing.T) {

m := RedisMessage{typ: '+', string: "this Update should have no effect"}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
lru.Update("1", "GET", m)
lru.Update("1", "GET", m, time.Now())
for i := 0; i < 2; i++ { // entry should be always nil after the first call if Close
if v, entry := lru.Flight("1", "GET", TTL, time.Now()); v.typ != 0 || entry != nil {
t.Fatalf("got unexpected value from the first Flight: %v %v", v, entry)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestLRU(t *testing.T) {
lru.Flight("key", "cmd", time.Second, time.Now())
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v := lru.GetTTL("key", "cmd"); !roughly(v, time.Second) {
t.Fatalf("unexpected %v", v)
}
Expand All @@ -206,7 +206,7 @@ func TestLRU(t *testing.T) {
lru.Flight("key", "cmd", 2*time.Second, time.Now())
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 1 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -216,7 +216,7 @@ func TestLRU(t *testing.T) {
lru.Flight("key", "cmd", 2*time.Second, time.Now())
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(3 * time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -225,7 +225,7 @@ func TestLRU(t *testing.T) {
lru := setup(t)
lru.Flight("key", "cmd", 2*time.Second, time.Now())
m := RedisMessage{typ: 1}
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -234,7 +234,7 @@ func TestLRU(t *testing.T) {
lru := setup(t)
lru.Flight("key", "cmd", 2*time.Second, time.Now())
m := RedisMessage{typ: 1}
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := lru.Flight("key", "cmd", 2*time.Second, time.Now()); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -260,7 +260,7 @@ func TestLRU(t *testing.T) {
t.Fatalf("got unexpected value from the Flight after pttl: %v %v", v, entry)
}
m := RedisMessage{typ: '+', string: "1"}
lru.Update("1", "GET", m)
lru.Update("1", "GET", m, time.Now())
if v, _ := flights(lru, time.Now(), TTL, "GET", "1"); v.typ == 0 {
t.Fatalf("did not get the value from the second Flight")
} else if v.string != "1" {
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestLRU(t *testing.T) {
flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i))
m := RedisMessage{typ: '+', string: strconv.Itoa(i)}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
lru.Update(strconv.Itoa(i), "GET", m)
lru.Update(strconv.Itoa(i), "GET", m, time.Now())
}
if v, entry := flights(lru, time.Now(), TTL, "GET", "1"); v.typ != 0 {
t.Fatalf("got evicted value from the first Flight: %v %v", v, entry)
Expand All @@ -334,7 +334,7 @@ func TestLRU(t *testing.T) {
for i := 1; i < Entries; i++ {
flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i))
m := RedisMessage{typ: '+', string: strconv.Itoa(i)}
lru.Update(strconv.Itoa(i), "GET", m)
lru.Update(strconv.Itoa(i), "GET", m, time.Now())
}
for i := 1; i < Entries; i++ {
if v, _ := flights(lru, time.Now(), TTL, "GET", strconv.Itoa(i)); v.string != strconv.Itoa(i) {
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestLRU(t *testing.T) {

m := RedisMessage{typ: '+', string: "this Update should have no effect"}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
lru.Update("1", "GET", m)
lru.Update("1", "GET", m, time.Now())
for i := 0; i < 2; i++ { // entry should be always nil after the first call if Close
if v, entry := flights(lru, time.Now(), TTL, "GET", "1"); v.typ != 0 || entry != nil {
t.Fatalf("got unexpected value from the first Flight: %v %v", v, entry)
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestLRU(t *testing.T) {
flights(lru, time.Now(), time.Second, "cmd", "key")
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v := lru.GetTTL("key", "cmd"); !roughly(v, time.Second) {
t.Fatalf("unexpected %v", v)
}
Expand All @@ -417,7 +417,7 @@ func TestLRU(t *testing.T) {
flights(lru, time.Now(), time.Second*2, "cmd", "key")
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 1 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -427,7 +427,7 @@ func TestLRU(t *testing.T) {
flights(lru, time.Now(), time.Second*2, "cmd", "key")
m := RedisMessage{typ: 1}
m.setExpireAt(time.Now().Add(3 * time.Second).UnixMilli())
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -436,7 +436,7 @@ func TestLRU(t *testing.T) {
lru := setup(t)
flights(lru, time.Now(), time.Second*2, "cmd", "key")
m := RedisMessage{typ: 1}
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand All @@ -445,7 +445,7 @@ func TestLRU(t *testing.T) {
lru := setup(t)
flights(lru, time.Now(), time.Second*2, "cmd", "key")
m := RedisMessage{typ: 1}
lru.Update("key", "cmd", m)
lru.Update("key", "cmd", m, time.Now())
if v, _ := flights(lru, time.Now(), time.Second*2, "cmd", "key"); v.CacheTTL() != 2 {
t.Fatalf("unexpected %v", v.CacheTTL())
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func BenchmarkLRU(b *testing.B) {
lru.Flight(key, "GET", TTL, time.Now())
m := RedisMessage{}
m.setExpireAt(time.Now().Add(PTTL * time.Millisecond).UnixMilli())
lru.Update(key, "GET", m)
lru.Update(key, "GET", m, time.Now())
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (p *pipe) _backgroundRead() (err error) {
if pttl := msg.values[i].integer; pttl >= 0 {
cp.setExpireAt(now.Add(time.Duration(pttl) * time.Millisecond).UnixMilli())
}
msgs[i].setExpireAt(p.cache.Update(ck, cc, cp))
msgs[i].setExpireAt(p.cache.Update(ck, cc, cp, now))
}
} else {
ck, cc := cmds.CacheKey(cacheable)
Expand All @@ -579,7 +579,7 @@ func (p *pipe) _backgroundRead() (err error) {
if pttl := msg.values[ci-1].integer; pttl >= 0 {
cp.setExpireAt(now.Add(time.Duration(pttl) * time.Millisecond).UnixMilli())
}
msg.values[ci].setExpireAt(p.cache.Update(ck, cc, cp))
msg.values[ci].setExpireAt(p.cache.Update(ck, cc, cp, now))
}
}
if prply {
Expand Down
Loading

0 comments on commit 5c44807

Please sign in to comment.