Skip to content

Commit

Permalink
Allow configuring caching directly via Pool struct
Browse files Browse the repository at this point in the history
  • Loading branch information
holykol committed Oct 2, 2020
1 parent 105000c commit d62ccb8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
38 changes: 18 additions & 20 deletions redis/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type Pool struct {

// Controls how Pool handles server-assisted caching
// introduced in Redis 6.0. This is disabled by default
cacheStrategy cacheStrategy
CacheStrategy cacheStrategy

chInitialized uint32 // set to 1 when field ch is initialized

Expand Down Expand Up @@ -218,6 +218,17 @@ func (p *Pool) Get() Conn {
// If the function completes without error, then the application must close the
// returned connection.
func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
// Start cache invalidator on first call to the pool
p.mu.Lock()
if p.CacheStrategy == CacheStrategyInvalidate && p.invalidatorID == 0 {
id, err := p.cacheInvalidator()
if err != nil {
return errorConn{err}, err
}
p.invalidatorID = id
}
p.mu.Unlock()

// Wait until there is a vacant connection in the pool.
waited, err := p.waitVacantConn(ctx)
if err != nil {
Expand Down Expand Up @@ -289,7 +300,7 @@ func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
invalidatorID := p.invalidatorID
p.mu.Unlock()

if p.cacheStrategy == CacheStrategyInvalidate && invalidatorID != 0 {
if p.CacheStrategy == CacheStrategyInvalidate && invalidatorID != 0 {
// Ask server to watch the keys requested by this connection and
// send invalidation notifications to our housekeeper process
_, err = c.Do("CLIENT", "TRACKING", "on", "REDIRECT", invalidatorID)
Expand All @@ -308,15 +319,6 @@ func (p *Pool) GetContext(ctx context.Context) (Conn, error) {
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
}

// EnableCaching method
func (p *Pool) EnableCaching(strategy cacheStrategy) error {
p.cacheStrategy = strategy
if strategy == CacheStrategyInvalidate {
return p.cacheInvalidator()
}
return nil
}

// PoolStats contains pool statistics.
type PoolStats struct {
// ActiveCount is the number of connections in the pool. The count includes
Expand Down Expand Up @@ -498,22 +500,18 @@ func (p *Pool) put(pc *poolConn, forceClose bool) error {
}

// cacheInvalidator start housekeeping goroutine handling cache invalidation
func (p *Pool) cacheInvalidator() error {
func (p *Pool) cacheInvalidator() (int, error) {
// Connection used for invalidation
conn, err := p.Dial()
if err != nil {
return err
return 0, err
}

id, err := Int(conn.Do("CLIENT", "ID"))
if err != nil {
return err
return 0, err
}

p.mu.Lock()
p.invalidatorID = id
p.mu.Unlock()

// Subscribe to revocation channel
conn.Send("SUBSCRIBE", "__redis__:invalidate")
conn.Flush()
Expand Down Expand Up @@ -549,7 +547,7 @@ func (p *Pool) cacheInvalidator() error {
}
}()

return nil
return id, nil
}

type activeConn struct {
Expand Down Expand Up @@ -653,7 +651,7 @@ func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, a

// Loader-style middleware to cache requests. For now it only caches GET commands
func (ac *activeConn) cacheLoader(commandName string, args []interface{}, f func() (interface{}, error)) (interface{}, error) {
if ac.p.cacheStrategy == CacheStrategyInvalidate && commandName == "GET" {
if ac.p.CacheStrategy == CacheStrategyInvalidate && commandName == "GET" {
key := args[0].(string)

// Happy path
Expand Down
7 changes: 2 additions & 5 deletions redis/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,11 +938,8 @@ func TestWaitPoolGetCanceledContext(t *testing.T) {
func TestPoolServerAssistedCaching(t *testing.T) {
d := poolDialer{t: t}
p := &redis.Pool{
Dial: d.dial,
}

if err := p.EnableCaching(redis.CacheStrategyInvalidate); err != nil {
d.t.Fatal("Unexpected error: ", err)
Dial: d.dial,
CacheStrategy: redis.CacheStrategyInvalidate,
}

defer p.Close()
Expand Down

0 comments on commit d62ccb8

Please sign in to comment.