Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Off-chain] feat: in-memory query cache(s) #994

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,20 @@ type BankQueryClient interface {
// GetBalance queries the chain for the uPOKT balance of the account provided
GetBalance(ctx context.Context, address string) (*cosmostypes.Coin, error)
}

// QueryCache handles a single type of cached data
type QueryCache[T any] interface {
Get(key string) (T, error)
Set(key string, value T) error
Delete(key string)
Clear()
}

// HistoricalQueryCache extends QueryCache to support historical values at different heights
type HistoricalQueryCache[T any] interface {
QueryCache[T]
// GetAtHeight retrieves the nearest value <= the specified height
GetAtHeight(key string, height int64) (T, error)
// SetAtHeight adds or updates a value at a specific height
SetAtHeight(key string, value T, height int64) error
}
76 changes: 76 additions & 0 deletions pkg/client/query/cache/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cache

import (
"time"
)

// EvictionPolicy determines how items are removed when cache is full.
type EvictionPolicy int64

const (
FirstInFirstOut = EvictionPolicy(iota)
LeastRecentlyUsed
LeastFrequentlyUsed
)

// CacheConfig is the configuration options for a cache.
type CacheConfig struct {
// MaxKeys is the maximum number of items the cache can hold.
MaxKeys int64
// EvictionPolicy is how items should be removed when the cache is full.
EvictionPolicy EvictionPolicy
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// TTL is how long items should remain in the cache
TTL time.Duration

// historical is whether the cache will cache a single value for each key
// (false) or whether it will cache a history of values for each key (true).
historical bool
// pruneOlderThan is the number of past blocks for which to keep historical
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// values. If 0, no historical pruning is performed.
pruneOlderThan int64
}

// CacheOption defines a function that configures a CacheConfig
type CacheOption func(*CacheConfig)

// HistoricalCacheConfig extends the basic CacheConfig with historical settings.
type HistoricalCacheConfig struct {
CacheConfig

// MaxHeightsPerKey is the maximum number of different heights to store per key
MaxHeightsPerKey int
// PruneOlderThan specifies how many blocks back to maintain in history
// If 0, no historical pruning is performed
PruneOlderThan int64
}

// WithHistoricalMode enables historical caching with the given pruneOlderThan
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// configuration, if 0 no historical pruning is performed.
func WithHistoricalMode(pruneOlderThan int64) CacheOption {
return func(cfg *CacheConfig) {
cfg.historical = true
cfg.pruneOlderThan = pruneOlderThan
}
}

// WithMaxKeys sets the maximum number of distinct key/value pairs the cache will
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// hold before evicting according to the configured eviction policy.
func WithMaxKeys(size int64) CacheOption {
return func(cfg *CacheConfig) {
cfg.MaxKeys = size
}
}

// WithEvictionPolicy sets the eviction policy
func WithEvictionPolicy(policy EvictionPolicy) CacheOption {
return func(cfg *CacheConfig) {
cfg.EvictionPolicy = policy
}
}

// WithTTL sets the time-to-live for cache entries
func WithTTL(ttl time.Duration) CacheOption {
return func(cfg *CacheConfig) {
cfg.TTL = ttl
}
}
10 changes: 10 additions & 0 deletions pkg/client/query/cache/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cache

import "cosmossdk.io/errors"

const codesace = "client/query/cache"

var (
ErrCacheMiss = errors.Register(codesace, 1, "cache miss")
ErrHistoricalModeNotEnabled = errors.Register(codesace, 2, "historical mode not enabled")
)
245 changes: 245 additions & 0 deletions pkg/client/query/cache/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package cache

import (
"sync"
"sync/atomic"
"time"

"github.com/pokt-network/poktroll/pkg/client"
)

var (
_ client.QueryCache[any] = (*InMemoryCache[any])(nil)
_ client.HistoricalQueryCache[any] = (*InMemoryCache[any])(nil)
)

// InMemoryCache provides a concurrency-safe in-memory cache implementation with
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
// optional historical value support.
type InMemoryCache[T any] struct {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
config CacheConfig
latestHeight atomic.Int64

itemsMu sync.RWMutex
// items type depends on historical mode:
// | historical mode | type |
// | --------------- | --------------------------------------- |
// | false | map[string]cacheItem[T] |
// | true | map[string]map[int64]heightCacheItem[T] |
items map[string]any
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}

// cacheItem wraps cached values with metadata
type cacheItem[T any] struct {
value T
timestamp time.Time
}

// heightCacheItem is used when the cache is in historical mode
type heightCacheItem[T any] struct {
value T
timestamp time.Time
}

// NewInMemoryCache creates a new cache with the given configuration
func NewInMemoryCache[T any](opts ...CacheOption) *InMemoryCache[T] {
config := CacheConfig{
EvictionPolicy: FirstInFirstOut,
}

for _, opt := range opts {
opt(&config)
}

return &InMemoryCache[T]{
items: make(map[string]interface{}),
config: config,
}
}

// Get retrieves an item from the cache
func (c *InMemoryCache[T]) Get(key string) (T, error) {
if c.config.historical {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
return c.GetAtHeight(key, c.latestHeight.Load())
}

c.itemsMu.RLock()
defer c.itemsMu.RUnlock()

var zero T

item, exists := c.items[key]
if !exists {
return zero, ErrCacheMiss
}

cItem := item.(cacheItem[T])
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
if c.config.TTL > 0 && time.Since(cItem.timestamp) > c.config.TTL {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// TODO_QUESTION: should we prune here?
return zero, ErrCacheMiss
}

return cItem.value, nil
}

// GetAtHeight retrieves an item from the cache at or before the specified height
func (c *InMemoryCache[T]) GetAtHeight(key string, height int64) (T, error) {
var zero T

if !c.config.historical {
return zero, ErrHistoricalModeNotEnabled
}

c.itemsMu.RLock()
defer c.itemsMu.RUnlock()

heightMap, exists := c.items[key]
if !exists {
return zero, ErrCacheMiss
}

versions := heightMap.(map[int64]heightCacheItem[T])
var nearestHeight int64 = -1
for h := range versions {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
if h <= height && h > nearestHeight {
nearestHeight = h
}
}

if nearestHeight == -1 {
return zero, ErrCacheMiss
}

item := versions[nearestHeight]
if c.config.TTL > 0 && time.Since(item.timestamp) > c.config.TTL {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
return zero, ErrCacheMiss
}

return item.value, nil
}

// Set adds or updates an item in the cache
func (c *InMemoryCache[T]) Set(key string, value T) error {
if c.config.historical {
return c.SetAtHeight(key, value, c.latestHeight.Load())
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
}

if c.config.MaxKeys > 0 && int64(len(c.items)) >= c.config.MaxKeys {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
c.evict()
}

c.itemsMu.Lock()
defer c.itemsMu.Unlock()

c.items[key] = cacheItem[T]{
value: value,
timestamp: time.Now(),
}

return nil
}

// SetAtHeight adds or updates an item in the cache at a specific height
func (c *InMemoryCache[T]) SetAtHeight(key string, value T, height int64) error {
if !c.config.historical {
return ErrHistoricalModeNotEnabled
}

// Update latest height if this is newer
latestHeight := c.latestHeight.Load()
if height > latestHeight {
// NB: Only update if c.latestHeight hasn't changed since we loaded it above.
c.latestHeight.CompareAndSwap(latestHeight, height)
}

c.itemsMu.Lock()
defer c.itemsMu.Unlock()

var history map[int64]heightCacheItem[T]
if existing, exists := c.items[key]; exists {
history = existing.(map[int64]heightCacheItem[T])
} else {
history = make(map[int64]heightCacheItem[T])
c.items[key] = history
}

// Prune old heights if configured
if c.config.pruneOlderThan > 0 {
for h := range history {
if height-h > c.config.pruneOlderThan {
delete(history, h)
}
}
}

history[height] = heightCacheItem[T]{
value: value,
timestamp: time.Now(),
}

return nil
}

// Delete removes an item from the cache.
func (c *InMemoryCache[T]) Delete(key string) {
c.itemsMu.Lock()
defer c.itemsMu.Unlock()

delete(c.items, key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we delete the (key,value) pair for latestHeight?

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate? I'm not quite following.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this comment doesn't make sense if you don't know where I was looking or what I was thinking...

If we delete the key for the latest height, do we need to also make an appropriate call to c.latestHeight.Store(??)?

If this is indeed the case, please add a unit test for it.

If not, would just appreciate a quick (no need for anything big) explanation.

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, good question. I don't think we need to update c.latestHeight, regardless of which key is being deleted. For Gets, this should be completely transparent to the caller; for Sets, it means that if you call Delete, followed by Set (as opposed to SetAtHeight), then the height you're setting will be the last c.latestHeight. I think this behavior is correct.

If not, would just appreciate a quick (no need for anything big) explanation.

I did see 👆 but had to think it through for myself anyways:

flowchart LR

subgraph hc2[Historical Cache]
    lh2["latestHeight: 2"]
    subgraph k2_2h["key2 history"]
        k2_2h1["height: 1; value 100"]
    end
    subgraph k2_1h["key1 history"]
        k2_1h1["height: 1; value 10"]
        k2_1h2["height: 2; value 20"]
    end
end

subgraph hc3[Historical Cache]
    lh3["latestHeight: 2"]
    subgraph k3_2h["key2 history"]
        k3_2h1["height: 1; value 100"]
    end
end


subgraph hc4[Historical Cache]
    lh4["latestHeight: 2"]
    subgraph k4_1h["key1 history"]
        k4_1h2["height: 2; value 30"]
    end
end

hc2 --"Delete(key1)"--> hc3
hc3 --"Set(key1, 30)"--> hc4

lh2 --> k2_1h2
lh4 --> k4_1h2
Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the explanation and diagram, I don't fully understand the need for latestHeight at all. In fact, I think we should delete it now if the behaviour you described is expected.

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to reconcile with this thread but after discussing with @red-0ne, we decided that it should be an error to call Set() on a HistoricalQueryCache. I think this indicates that HistoricalQueryCache SHOULD NOT embed QueryCache after all. Subsequently, latestVersion is no longer necessary and can be removed.

Copy link
Contributor Author

@bryanchriswhite bryanchriswhite Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// QueryCache is a key/value store style interface for a cache of a single type.
// It is intended to be used to cache query responses (or derivatives thereof),
// where each key uniquely indexes the most recent query response.
type QueryCache[T any] interface {
	Get(key string) (T, error)
	Set(key string, value T) error
	Delete(key string)
	Clear()
}

// HistoricalQueryCache extends QueryCache to support getting and setting values
// at multiple heights for a given key.
type HistoricalQueryCache[T any] interface {
	// GetLatestVersion retrieves the historical value with the highest version number.
	GetLatestVersion(key string) (T, error)
	// GetAsOfVersion retrieves the nearest value <= the specified version number.
	GetAsOfVersion(key string, version int64) (T, error)
	// SetAsOfVersion adds or updates a value at a specific version number.
	SetAsOfVersion(key string, value T, version int64) error
}

(class diagrams also updated)

}

// Clear removes all items from the cache
func (c *InMemoryCache[T]) Clear() {
c.itemsMu.Lock()
defer c.itemsMu.Unlock()

c.items = make(map[string]interface{})
c.latestHeight.Store(0)
}

// evict removes one item according to the configured eviction policy
func (c *InMemoryCache[T]) evict() {
switch c.config.EvictionPolicy {
case FirstInFirstOut:
var oldestKey string
var oldestTime time.Time
first := true

for key, item := range c.items {
var itemTime time.Time
if c.config.historical {
versions := item.(map[int64]heightCacheItem[T])
for _, v := range versions {
if itemTime.IsZero() || v.timestamp.Before(itemTime) {
itemTime = v.timestamp
}
}
} else {
itemTime = item.(cacheItem[T]).timestamp
}

if first || itemTime.Before(oldestTime) {
oldestKey = key
oldestTime = itemTime
first = false
}
}
delete(c.items, oldestKey)

case LeastRecentlyUsed:
// TODO: Implement LRU eviction
// This will require tracking access times
panic("LRU eviction not implemented")

case LeastFrequentlyUsed:
// TODO: Implement LFU eviction
// This will require tracking access times
panic("LFU eviction not implemented")

default:
// Default to FIFO if policy not recognized
for key := range c.items {
delete(c.items, key)
return
}
}
}
Loading
Loading