diff --git a/benchmark_test.go b/benchmark_test.go index a6a5f0f..76f49b9 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -181,7 +181,7 @@ func BenchmarkGetByPrimaryKey_RapidashWorst(b *testing.B) { panic(err) } cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) @@ -224,7 +224,7 @@ func BenchmarkGetByPrimaryKey_RapidashBest(b *testing.B) { } defer conn.Close() cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) @@ -336,7 +336,7 @@ func BenchmarkInsert_Rapidash(b *testing.B) { } defer conn.Close() cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) @@ -438,7 +438,7 @@ func BenchmarkUpdateByPrimaryKey_RapidashWorst(b *testing.B) { } defer conn.Close() cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) @@ -484,7 +484,7 @@ func BenchmarkUpdateByPrimaryKey_RapidashBest(b *testing.B) { } defer conn.Close() cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) @@ -598,7 +598,7 @@ func BenchmarkDeleteByPrimaryKey_Rapidash(b *testing.B) { } defer conn.Close() cache, err := New( - ServerAddrs([]string{"localhost:11211"}), + Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), SecondLevelCachePessimisticLock(false), SecondLevelCacheOptimisticLock(false), ) diff --git a/config.go b/config.go index 3a27c1e..b64be2c 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,7 @@ type Config struct { } type RuleConfig struct { - Servers *[]string `yaml:"servers"` + Servers *ServersConfig `yaml:"servers"` Logger *LoggerConfig `yaml:"logger"` Retry *RetryConfig `yaml:"retry"` CacheControl *CacheControlConfig `yaml:"cache_control"` @@ -39,7 +39,7 @@ type CacheControlConfig struct { } type SLCConfig struct { - Servers *[]string `yaml:"servers"` + Servers *ServersConfig `yaml:"servers"` Tables *map[string]*TableConfig `yaml:"tables"` Expiration *time.Duration `yaml:"expiration"` LockExpiration *time.Duration `yaml:"lock_expiration"` @@ -47,14 +47,14 @@ type SLCConfig struct { type TableConfig struct { ShardKey *string `yaml:"shard_key"` - Server *string `yaml:"server"` + Server *ServerConfig `yaml:"server"` CacheControl *CacheControlConfig `yaml:"cache_control"` Expiration *time.Duration `yaml:"expiration"` LockExpiration *time.Duration `yaml:"lock_expiration"` } type LLCConfig struct { - Servers *[]string + Servers *ServersConfig `yaml:"servers"` Tags *map[string]*TagConfig `yaml:"tags"` CacheControl *CacheControlConfig `yaml:"cache_control"` Expiration *time.Duration `yaml:"expiration"` @@ -62,11 +62,21 @@ type LLCConfig struct { } type TagConfig struct { - Server *string `yaml:"server"` + Server *ServerConfig `yaml:"server"` Expiration *time.Duration `yaml:"expiration"` LockExpiration *time.Duration `yaml:"lock_expiration"` } +type ServersConfig struct { + Type CacheServerType `yaml:"type"` + Addrs []string `yaml:"addrs"` +} + +type ServerConfig struct { + Type CacheServerType `yaml:"type"` + Addr string `yaml:"addr"` +} + func NewConfig(path string) (*Config, error) { file, err := ioutil.ReadFile(path) if err != nil { @@ -82,15 +92,19 @@ func NewConfig(path string) (*Config, error) { func (cfg *Config) Options() []OptionFunc { opts := []OptionFunc{} opts = append(opts, cfg.Rule.Options()...) - opts = append(opts, cfg.SLC.Options()...) - opts = append(opts, cfg.LLC.Options()...) + if cfg.SLC != nil { + opts = append(opts, cfg.SLC.Options()...) + } + if cfg.LLC != nil { + opts = append(opts, cfg.LLC.Options()...) + } return opts } func (cfg *RuleConfig) Options() []OptionFunc { opts := []OptionFunc{} if cfg.Servers != nil { - opts = append(opts, ServerAddrs(*cfg.Servers)) + opts = append(opts, Servers(*cfg.Servers)) } if cfg.MaxIdleConnection != nil { opts = append(opts, MaxIdleConnections(*cfg.MaxIdleConnection)) @@ -167,7 +181,7 @@ func (cfg *CacheControlConfig) TableOptions(table string) []OptionFunc { func (cfg *SLCConfig) Options() []OptionFunc { opts := []OptionFunc{} if cfg.Servers != nil { - opts = append(opts, SecondLevelCacheServerAddrs(*cfg.Servers)) + opts = append(opts, SecondLevelCacheServers(*cfg.Servers)) } if cfg.Tables != nil { for table, tableCfg := range *cfg.Tables { @@ -189,7 +203,7 @@ func (cfg *TableConfig) Options(table string) []OptionFunc { opts = append(opts, SecondLevelCacheTableShardKey(table, *cfg.ShardKey)) } if cfg.Server != nil { - opts = append(opts, SecondLevelCacheTableServerAddr(table, *cfg.Server)) + opts = append(opts, SecondLevelCacheTableServer(table, *cfg.Server)) } if cfg.Expiration != nil { opts = append(opts, SecondLevelCacheTableExpiration(table, *cfg.Expiration)) @@ -206,7 +220,7 @@ func (cfg *TableConfig) Options(table string) []OptionFunc { func (cfg *LLCConfig) Options() []OptionFunc { opts := []OptionFunc{} if cfg.Servers != nil { - opts = append(opts, LastLevelCacheServerAddrs(*cfg.Servers)) + opts = append(opts, LastLevelCacheServer(*cfg.Servers)) } if cfg.Tags != nil { for tag, tagCfg := range *cfg.Tags { @@ -228,7 +242,7 @@ func (cfg *LLCConfig) Options() []OptionFunc { func (cfg *TagConfig) Options(tag string) []OptionFunc { opts := []OptionFunc{} if cfg.Server != nil { - opts = append(opts, LastLevelCacheTagServerAddr(tag, *cfg.Server)) + opts = append(opts, LastLevelCacheTagServer(tag, *cfg.Server)) } if cfg.Expiration != nil { opts = append(opts, LastLevelCacheTagExpiration(tag, *cfg.Expiration)) diff --git a/config_test.go b/config_test.go index 993965a..6ac8697 100644 --- a/config_test.go +++ b/config_test.go @@ -10,13 +10,30 @@ import ( func TestConfig(t *testing.T) { cfg, err := NewConfig("testdata/cache.yml") NoError(t, err) + testConfig(t, cfg) +} + +func TestConfigWithSlcOptionAndLlcOption(t *testing.T) { + cfg, err := NewConfig("testdata/cache_use_slc_option_and_llc_option.yml") + NoError(t, err) + testConfig(t, cfg) +} + +func TestConfigWithSlcTableOptionAndLlcTagOption(t *testing.T) { + cfg, err := NewConfig("testdata/cache_use_slc_table_option_and_llc_tag_option.yml") + NoError(t, err) + testConfig(t, cfg) +} + +func testConfig(t *testing.T, cfg *Config) { cache, err := New(cfg.Options()...) NoError(t, err) NoError(t, cache.Flush()) conn, err := sql.Open("mysql", "root:@tcp(localhost:3306)/rapidash?parseTime=true") NoError(t, err) NoError(t, cache.WarmUp(conn, userLoginType(), false)) - t.Run("create new records", func(t *testing.T) { + NoError(t, cache.WarmUp(conn, userLogType(), false)) + t.Run("create new records use redis cache when table option specify", func(t *testing.T) { txConn, err := conn.Begin() NoError(t, err) tx, err := cache.Begin(txConn) @@ -37,7 +54,7 @@ func TestConfig(t *testing.T) { } NoError(t, tx.Commit()) }) - t.Run("get records", func(t *testing.T) { + t.Run("get records use redis cache when table option specify", func(t *testing.T) { txConn, err := conn.Begin() NoError(t, err) tx, err := cache.Begin(txConn) @@ -54,14 +71,110 @@ func TestConfig(t *testing.T) { } NoError(t, tx.Commit()) }) - t.Run("last level cache", func(t *testing.T) { + t.Run("delete records use redis cache when table option specify", func(t *testing.T) { + txConn, err := conn.Begin() + NoError(t, err) + tx, err := cache.Begin(txConn) + NoError(t, err) + for i := 1001; i <= 1005; i++ { + builder := NewQueryBuilder("user_logins"). + Eq("user_id", uint64(i)). + Eq("user_session_id", uint64(i)) + NoError(t, tx.DeleteByQueryBuilder(builder)) + } + NoError(t, tx.Commit()) + }) + t.Run("create new records use memcached cache when table option specify", func(t *testing.T) { + txConn, err := conn.Begin() + NoError(t, err) + tx, err := cache.Begin(txConn) + NoError(t, err) + now := time.Now() + for i := 1001; i <= 1005; i++ { + id := uint64(i) + if _, err := tx.CreateByTable("user_logs", &UserLog{ + ID: id, + UserID: id, + ContentType: "web", + ContentID: id, + CreatedAt: &now, + UpdatedAt: &now, + }); err != nil { + t.Fatalf("%+v", err) + } + } + NoError(t, tx.Commit()) + }) + t.Run("get records use memcached cache when table option specify", func(t *testing.T) { + txConn, err := conn.Begin() + NoError(t, err) + tx, err := cache.Begin(txConn) + NoError(t, err) + for i := 1001; i <= 1005; i++ { + builder := NewQueryBuilder("user_logs"). + Eq("id", uint64(i)) + var foundUserLog UserLog + NoError(t, tx.FindByQueryBuilder(builder, &foundUserLog)) + if foundUserLog.ID == 0 { + t.Fatal("cannot find record") + } + } + NoError(t, tx.Commit()) + }) + t.Run("delete records use memcached cache when table option specify", func(t *testing.T) { + txConn, err := conn.Begin() + NoError(t, err) + tx, err := cache.Begin(txConn) + NoError(t, err) + for i := 1001; i <= 1005; i++ { + builder := NewQueryBuilder("user_logs"). + Eq("id", uint64(i)) + NoError(t, tx.DeleteByQueryBuilder(builder)) + } + NoError(t, tx.Commit()) + }) + t.Run("last level cache with tag use redis cache when tag option specify", func(t *testing.T) { + tx, err := cache.Begin() + NoError(t, err) + tag := "a" + for i := 1; i <= 10; i++ { + key := fmt.Sprintf("key_with_tag_%s_%d", tag, i) + NoError(t, tx.CreateWithTag(tag, key, Int(1))) + var a int + NoError(t, tx.FindWithTag(tag, key, IntPtr(&a))) + if a != 1 { + t.Fatal("cannot work set/get with tag") + } + } + NoError(t, tx.Commit()) + }) + t.Run("last level cache with tag use memcached cache when tag option specify", func(t *testing.T) { tx, err := cache.Begin() NoError(t, err) - NoError(t, tx.CreateWithTag("a", "key", Int(1))) - var a int - NoError(t, tx.FindWithTag("a", "key", IntPtr(&a))) - if a != 1 { - t.Fatal("cannot work set/get with tag") + tag := "b" + for i := 1; i <= 10; i++ { + key := fmt.Sprintf("key_with_tag_%s_%d", tag, i) + NoError(t, tx.CreateWithTag(tag, key, Int(1))) + var a int + NoError(t, tx.FindWithTag(tag, key, IntPtr(&a))) + if a != 1 { + t.Fatal("cannot work set/get with tag") + } + } + NoError(t, tx.Commit()) + }) + t.Run("last level cache without tag", func(t *testing.T) { + tx, err := cache.Begin() + NoError(t, err) + for i := 1; i <= 10; i++ { + key := fmt.Sprintf("key_with_not_tag_%d", i) + NoError(t, tx.Create(key, Int(1))) + var a int + NoError(t, tx.Find(key, IntPtr(&a))) + if a != 1 { + t.Fatal("cannot work set/get with tag") + } } + NoError(t, tx.Commit()) }) } diff --git a/last_level_cache.go b/last_level_cache.go index 4239bd4..03c90d7 100644 --- a/last_level_cache.go +++ b/last_level_cache.go @@ -2,6 +2,7 @@ package rapidash import ( "fmt" + "sync" "time" "go.knocknote.io/rapidash/server" @@ -26,7 +27,7 @@ func (c *LastLevelCache) cacheKey(tag, key string) (server.CacheKey, error) { typ: server.CacheKeyTypeLLC, } if opt, exists := c.opt.tagOpt[tag]; exists { - addr, err := getAddr(opt.server) + addr, err := getAddr(opt.server.addr) if err != nil { return nil, xerrors.Errorf("cannot get addr: %w", err) } @@ -39,7 +40,7 @@ func (c *LastLevelCache) cacheKey(tag, key string) (server.CacheKey, error) { return cacheKey, nil } -func (c *LastLevelCache) lockKey(tx *Tx, key server.CacheKey, expiration time.Duration) error { +func (c *LastLevelCache) lockKey(tx *Tx, tag string, key server.CacheKey, expiration time.Duration) error { value := &TxValue{ id: tx.id, key: key.String(), @@ -51,7 +52,13 @@ func (c *LastLevelCache) lockKey(tx *Tx, key server.CacheKey, expiration time.Du } lockKey := key.LockKey() log.Add(tx.id, lockKey, value) - if err := c.cacheServer.Add(lockKey, bytes, expiration); err != nil { + var cacheServer server.CacheServer + if lastLevelCache, exists := tx.r.lastLevelCaches.get(tag); exists { + cacheServer = lastLevelCache.cacheServer + } else { + cacheServer = c.cacheServer + } + if err := cacheServer.Add(lockKey, bytes, expiration); err != nil { content, getErr := c.cacheServer.Get(lockKey) if xerrors.Is(getErr, server.ErrCacheMiss) { return xerrors.Errorf("fatal error. cannot add transaction key. but transaction key doesn't exist: %w", err) @@ -67,7 +74,15 @@ func (c *LastLevelCache) lockKey(tx *Tx, key server.CacheKey, expiration time.Du } return xerrors.Errorf("lock key (%s) is already added. value is %s: %w", lockKey, value, err) } - tx.lockKeys = append(tx.lockKeys, lockKey) + if tag == "" { + tx.lastLevelCacheLockKey.withoutTagLockKeys = append(tx.lastLevelCacheLockKey.withoutTagLockKeys, lockKey) + } else { + if _, exists := tx.lastLevelCacheLockKey.withTagLockKeys[tag]; exists { + tx.lastLevelCacheLockKey.withTagLockKeys[tag] = append(tx.lastLevelCacheLockKey.withTagLockKeys[tag], lockKey) + } else { + tx.lastLevelCacheLockKey.withTagLockKeys[tag] = []server.CacheKey{lockKey} + } + } return nil } @@ -83,7 +98,7 @@ func (c *LastLevelCache) Create(tx *Tx, tag, key string, value Type, expiration keyStr := cacheKey.String() tx.stash.lastLevelCacheKeyToBytes[keyStr] = content if _, exists := tx.pendingQueries[keyStr]; !exists { - if err := c.lockKey(tx, cacheKey, expiration); err != nil { + if err := c.lockKey(tx, tag, cacheKey, expiration); err != nil { return xerrors.Errorf("failed to lock key: %w", err) } } @@ -140,7 +155,7 @@ func (c *LastLevelCache) Update(tx *Tx, tag, key string, value Type, expiration } keyStr := cacheKey.String() if _, exists := tx.pendingQueries[keyStr]; !exists { - if err := c.lockKey(tx, cacheKey, expiration); err != nil { + if err := c.lockKey(tx, tag, cacheKey, expiration); err != nil { return xerrors.Errorf("failed to lock key: %w", err) } } @@ -196,3 +211,44 @@ func (c *LastLevelCache) Delete(tx *Tx, tag, key string) error { } return nil } + +type LastLevelCacheMap struct { + *sync.Map +} + +func (c *LastLevelCacheMap) set(tagName string, cache *LastLevelCache) { + c.Store(tagName, cache) +} + +func (c *LastLevelCacheMap) get(tagName string) (*LastLevelCache, bool) { + cache, exists := c.Load(tagName) + if !exists { + return nil, false + } + return cache.(*LastLevelCache), exists +} + +func (c *LastLevelCacheMap) keys() []string { + if c.length() != 0 { + keys := make([]string, c.length()) + c.Range(func(key, value interface{}) bool { + keys = append(keys, key.(string)) + return true + }) + return keys + } + return []string{} +} + +func (c *LastLevelCacheMap) length() uint64 { + len := 0 + c.Range(func(key, value interface{}) bool { + len++ + return true + }) + return uint64(len) +} + +func NewLastLevelCacheMap() *LastLevelCacheMap { + return &LastLevelCacheMap{&sync.Map{}} +} diff --git a/option.go b/option.go index 346cb97..4a1d63e 100644 --- a/option.go +++ b/option.go @@ -6,27 +6,30 @@ import ( type OptionFunc func(*Rapidash) -func ServerType(typ CacheServerType) OptionFunc { +func Servers(servers ServersConfig) OptionFunc { return func(r *Rapidash) { - r.opt.serverType = typ + r.opt.servers = &ServersOption{ + typ: servers.Type, + addrs: servers.Addrs, + } } } -func ServerAddrs(addrs []string) OptionFunc { +func SecondLevelCacheServers(servers ServersConfig) OptionFunc { return func(r *Rapidash) { - r.opt.serverAddrs = addrs + r.opt.slcServer = &ServersOption{ + typ: servers.Type, + addrs: servers.Addrs, + } } } -func SecondLevelCacheServerAddrs(addrs []string) OptionFunc { +func LastLevelCacheServer(servers ServersConfig) OptionFunc { return func(r *Rapidash) { - r.opt.slcServerAddrs = addrs - } -} - -func LastLevelCacheServerAddrs(addrs []string) OptionFunc { - return func(r *Rapidash) { - r.opt.llcServerAddrs = addrs + r.opt.llcServer = &ServersOption{ + typ: servers.Type, + addrs: servers.Addrs, + } } } @@ -104,10 +107,13 @@ func SecondLevelCacheTableShardKey(table string, shardKey string) OptionFunc { } } -func SecondLevelCacheTableServerAddr(table string, serverAddr string) OptionFunc { +func SecondLevelCacheTableServer(table string, server ServerConfig) OptionFunc { return func(r *Rapidash) { opt := r.opt.slcTableOpt[table] - opt.server = &serverAddr + opt.server = &ServerOption{ + typ: server.Type, + addr: server.Addr, + } r.opt.slcTableOpt[table] = opt } } @@ -168,10 +174,13 @@ func LastLevelCachePessimisticLock(enabled bool) OptionFunc { } } -func LastLevelCacheTagServerAddr(tag string, serverAddr string) OptionFunc { +func LastLevelCacheTagServer(tag string, server ServerConfig) OptionFunc { return func(r *Rapidash) { opt := r.opt.llcOpt.tagOpt[tag] - opt.server = serverAddr + opt.server = ServerOption{ + typ: server.Type, + addr: server.Addr, + } r.opt.llcOpt.tagOpt[tag] = opt } } diff --git a/rapidash.go b/rapidash.go index 2db4d4a..336d3c9 100644 --- a/rapidash.go +++ b/rapidash.go @@ -3,6 +3,7 @@ package rapidash import ( "context" "database/sql" + "fmt" "net" "sort" "strings" @@ -32,6 +33,7 @@ type Rapidash struct { firstLevelCaches *FirstLevelCacheMap secondLevelCaches *SecondLevelCacheMap lastLevelCache *LastLevelCache + lastLevelCaches *LastLevelCacheMap opt Option } @@ -64,7 +66,7 @@ const ( type TableOption struct { shardKey *string - server *string + server *ServerOption expiration *time.Duration lockExpiration *time.Duration optimisticLock *bool @@ -78,11 +80,18 @@ func (o *TableOption) ShardKey() string { return *o.shardKey } -func (o *TableOption) Server() string { +func (o *TableOption) ServerType() CacheServerType { + if o.server == nil { + return 0 + } + return o.server.typ +} + +func (o *TableOption) ServerAddr() string { if o.server == nil { return "" } - return *o.server + return o.server.addr } func (o *TableOption) Expiration() time.Duration { @@ -122,11 +131,21 @@ type LastLevelCacheOption struct { } type TagOption struct { - server string + server ServerOption expiration time.Duration lockExpiration time.Duration } +type ServersOption struct { + typ CacheServerType + addrs []string +} + +type ServerOption struct { + typ CacheServerType + addr string +} + type QueryLog struct { Key string `json:"key"` Hash uint32 `json:"hash"` @@ -135,8 +154,7 @@ type QueryLog struct { } type Option struct { - serverType CacheServerType - serverAddrs []string + servers *ServersOption timeout time.Duration maxIdleConnections int maxRetryCount int @@ -144,7 +162,7 @@ type Option struct { logMode LogModeType logEnabled bool logServerAddr string - slcServerAddrs []string + slcServer *ServersOption slcLockExpiration time.Duration slcExpiration time.Duration slcOptimisticLock bool @@ -152,7 +170,7 @@ type Option struct { slcIgnoreNewerCache bool slcTableOpt map[string]TableOption llcOpt *LastLevelCacheOption - llcServerAddrs []string + llcServer *ServersOption beforeCommitCallback func(*Tx, []*QueryLog) error afterCommitSuccessCallback func(*Tx) error afterCommitFailureCallback func(*Tx, []*QueryLog) error @@ -160,7 +178,7 @@ type Option struct { func defaultOption() Option { return Option{ - serverType: CacheServerTypeMemcached, + servers: &ServersOption{typ: CacheServerTypeMemcached}, timeout: DefaultTimeout, maxIdleConnections: DefaultMaxIdleConns, maxRetryCount: 3, @@ -195,14 +213,25 @@ type PendingQuery struct { fn func() error } +type SecondLevelCacheLockKey struct { + lockKeys map[string][]server.CacheKey +} + +type LastLevelCacheLockKey struct { + withoutTagLockKeys []server.CacheKey + withTagLockKeys map[string][]server.CacheKey +} + type Tx struct { - r *Rapidash - conn Connection - stash *Stash - id string - pendingQueries map[string]*PendingQuery - lockKeys []server.CacheKey - isCommitted bool + r *Rapidash + conn Connection + stash *Stash + id string + pendingQueries map[string]*PendingQuery + lockKeys []server.CacheKey + secondLevelCacheLockKey SecondLevelCacheLockKey + lastLevelCacheLockKey LastLevelCacheLockKey + isCommitted bool } type Stash struct { @@ -234,12 +263,14 @@ func (r *Rapidash) Begin(conns ...Connection) (*Tx, error) { conn = conns[0] } return &Tx{ - r: r, - conn: conn, - stash: NewStash(), - id: xid.New().String(), - pendingQueries: map[string]*PendingQuery{}, - lockKeys: []server.CacheKey{}, + r: r, + conn: conn, + stash: NewStash(), + id: xid.New().String(), + pendingQueries: map[string]*PendingQuery{}, + lockKeys: []server.CacheKey{}, + secondLevelCacheLockKey: SecondLevelCacheLockKey{lockKeys: map[string][]server.CacheKey{}}, + lastLevelCacheLockKey: LastLevelCacheLockKey{withoutTagLockKeys: []server.CacheKey{}, withTagLockKeys: map[string][]server.CacheKey{}}, }, nil } @@ -272,6 +303,12 @@ func (tx *Tx) CreateWithTagAndExpiration(tag, key string, value Type, expiration if tx.isCommitted { return ErrAlreadyCommittedTransaction } + if c, exists := tx.r.lastLevelCaches.get(tag); exists { + if err := c.Create(tx, tag, key, value, expiration); err != nil { + return xerrors.Errorf("failed to Create: %w", err) + } + return nil + } if err := tx.r.lastLevelCache.Create(tx, tag, key, value, expiration); err != nil { return xerrors.Errorf("failed to Create: %w", err) } @@ -289,6 +326,11 @@ func (tx *Tx) FindWithTag(tag, key string, value Type) error { if tx.isCommitted { return ErrAlreadyCommittedTransaction } + if c, exists := tx.r.lastLevelCaches.get(tag); exists { + if err := c.Find(tx, tag, key, value); err != nil { + return xerrors.Errorf("failed to Find: %w", err) + } + } if err := tx.r.lastLevelCache.Find(tx, tag, key, value); err != nil { return xerrors.Errorf("failed to Find: %w", err) } @@ -418,11 +460,11 @@ func (tx *Tx) FindByQueryBuilderContext(ctx context.Context, builder *QueryBuild } func (tx *Tx) CountByQueryBuilder(builder *QueryBuilder) (uint64, error) { - count, err := tx.CountByQueryBuilderContext(context.Background(), builder) - if err != nil { - return 0, xerrors.Errorf("failed to CountByQueryBuilderContext: %w", err) - } - return count, nil + count, err := tx.CountByQueryBuilderContext(context.Background(), builder) + if err != nil { + return 0, xerrors.Errorf("failed to CountByQueryBuilderContext: %w", err) + } + return count, nil } func (tx *Tx) CountByQueryBuilderContext(ctx context.Context, builder *QueryBuilder) (uint64, error) { @@ -534,12 +576,31 @@ func (tx *Tx) sortedPendingQueryKeys() []string { func (tx *Tx) unlockAllKeys() error { mergedErr := []string{} - for _, key := range tx.lockKeys { - log.Delete(tx.id, SLCServer, key) - if err := tx.r.cacheServer.Delete(key); err != nil { + for tableName, lockKeys := range tx.secondLevelCacheLockKey.lockKeys { + for _, lockKey := range lockKeys { + if c, exists := tx.r.secondLevelCaches.get(tableName); exists { + if err := c.cacheServer.Delete(lockKey); err != nil { + mergedErr = append(mergedErr, err.Error()) + } + } else { + mergedErr = append(mergedErr, fmt.Sprintf("unknown table name %s", tableName)) + } + } + } + for _, lockKey := range tx.lastLevelCacheLockKey.withoutTagLockKeys { + if err := tx.r.lastLevelCache.cacheServer.Delete(lockKey); err != nil { mergedErr = append(mergedErr, err.Error()) } } + for tag, lockKeys := range tx.lastLevelCacheLockKey.withTagLockKeys { + for _, lockKey := range lockKeys { + if c, exists := tx.r.lastLevelCaches.get(tag); exists { + if err := c.cacheServer.Delete(lockKey); err != nil { + mergedErr = append(mergedErr, err.Error()) + } + } + } + } if len(mergedErr) > 0 { return xerrors.Errorf("%s: %w", strings.Join(mergedErr, ","), ErrUnlockCacheKeys) } @@ -821,7 +882,28 @@ func (r *Rapidash) tableOption(tableName string) TableOption { } func (r *Rapidash) WarmUpSecondLevelCache(conn *sql.DB, typ *Struct) error { - slc := NewSecondLevelCache(typ, r.cacheServer, r.tableOption(typ.tableName)) + var cacheServer server.CacheServer + if tableOption, exists := r.opt.slcTableOpt[typ.tableName]; exists { + selectors := &Selectors{} + if err := selectors.setSelector([]string{}, []string{tableOption.ServerAddr()}, []string{}); err != nil { + return xerrors.Errorf("failed to set cache server selector: %w", err) + } + switch tableOption.ServerType() { + case CacheServerTypeMemcached: + cacheServer = server.NewMemcachedBySelectors(selectors.slcSelector, nil) + case CacheServerTypeRedis: + cacheServer = server.NewRedisBySelectors(selectors.slcSelector, nil) + } + if err := cacheServer.SetTimeout(r.opt.timeout); err != nil { + return xerrors.Errorf("failed to set timeout for cache server: %w", err) + } + if err := cacheServer.SetMaxIdleConnections(r.opt.maxIdleConnections); err != nil { + return xerrors.Errorf("failed to set max idle connections for cache server: %w", err) + } + } else { + cacheServer = r.cacheServer + } + slc := NewSecondLevelCache(typ, cacheServer, r.tableOption(typ.tableName)) if err := slc.WarmUp(conn); err != nil { return xerrors.Errorf("cannot warm up SecondLevelCache. table is %s: %w", typ.tableName, err) } @@ -884,38 +966,94 @@ func (r *Rapidash) AddLastLevelCacheServer(servers ...string) error { } func (r *Rapidash) Flush() error { - if err := r.cacheServer.Flush(); err != nil { - return xerrors.Errorf("failed to flush cache server: %w", err) + for _, key := range r.secondLevelCaches.keys() { + if c, exists := r.secondLevelCaches.get(key); exists { + if err := c.cacheServer.Flush(); err != nil { + return xerrors.Errorf("failed to flush second level cache server: %w", err) + } + } } + for _, key := range r.lastLevelCaches.keys() { + if c, exists := r.lastLevelCaches.get(key); exists { + if err := c.cacheServer.Flush(); err != nil { + return xerrors.Errorf("failed to flush last level cache server: %w", err) + } + } + } + r.lastLevelCache.cacheServer.Flush() return nil } func (r *Rapidash) setServer() error { - switch r.opt.serverType { + s := &Selectors{} + if err := s.setSelector(r.opt.servers.addrs, []string{}, []string{}); err != nil { + return xerrors.Errorf("failed to set cache server selector: %w", err) + } + switch r.opt.servers.typ { case CacheServerTypeMemcached: - s := &Selectors{} - if err := s.setSelector(r.opt.serverAddrs, r.opt.slcServerAddrs, r.opt.llcServerAddrs); err != nil { - return xerrors.Errorf("failed to set cache server selector: %w", err) - } - memcached := server.NewMemcachedBySelectors(s.slcSelector, s.llcSelector) - r.cacheServer = memcached + r.cacheServer = server.NewMemcachedBySelectors(s.slcSelector, s.llcSelector) r.lastLevelCache = NewLastLevelCache(r.cacheServer, r.opt.llcOpt) case CacheServerTypeRedis: - s := &Selectors{} - if err := s.setSelector(r.opt.serverAddrs, r.opt.slcServerAddrs, r.opt.llcServerAddrs); err != nil { - return xerrors.Errorf("failed to set cache server selector: %w", err) - } - redis := server.NewRedisBySelectors(s.slcSelector, s.llcSelector) - r.cacheServer = redis + r.cacheServer = server.NewRedisBySelectors(s.slcSelector, s.llcSelector) r.lastLevelCache = NewLastLevelCache(r.cacheServer, r.opt.llcOpt) case CacheServerTypeOnMemory: } + if r.opt.slcServer != nil { + switch r.opt.slcServer.typ { + case CacheServerTypeMemcached: + r.cacheServer = server.NewMemcachedBySelectors(s.slcSelector, nil) + case CacheServerTypeRedis: + r.cacheServer = server.NewRedisBySelectors(s.slcSelector, nil) + case CacheServerTypeOnMemory: + } + } if err := r.cacheServer.SetTimeout(r.opt.timeout); err != nil { return xerrors.Errorf("failed to set timeout for cache server: %w", err) } if err := r.cacheServer.SetMaxIdleConnections(r.opt.maxIdleConnections); err != nil { return xerrors.Errorf("failed to set max idle connections for cache server: %w", err) } + if r.opt.llcServer != nil { + switch r.opt.llcServer.typ { + case CacheServerTypeMemcached: + r.lastLevelCache = NewLastLevelCache(server.NewMemcachedBySelectors(nil, s.llcSelector), r.opt.llcOpt) + case CacheServerTypeRedis: + r.lastLevelCache = NewLastLevelCache(server.NewRedisBySelectors(nil, s.llcSelector), r.opt.llcOpt) + case CacheServerTypeOnMemory: + } + if err := r.lastLevelCache.cacheServer.SetTimeout(r.opt.timeout); err != nil { + return xerrors.Errorf("failed to set timeout for cache server: %w", err) + } + if err := r.lastLevelCache.cacheServer.SetMaxIdleConnections(r.opt.maxIdleConnections); err != nil { + return xerrors.Errorf("failed to set max idle connections for cache server: %w", err) + } + } + if r.opt.llcOpt.tagOpt != nil && len(r.opt.llcOpt.tagOpt) > 0 { + for tagName, tagOption := range r.opt.llcOpt.tagOpt { + llcSelectors := &Selectors{} + if err := llcSelectors.setSelector([]string{}, []string{}, []string{tagOption.server.addr}); err != nil { + return xerrors.Errorf("failed to set cache server selector: %w", err) + } + var cacheServer server.CacheServer + switch tagOption.server.typ { + case CacheServerTypeMemcached: + cacheServer = server.NewMemcachedBySelectors(nil, llcSelectors.llcSelector) + case CacheServerTypeRedis: + cacheServer = server.NewRedisBySelectors(nil, llcSelectors.llcSelector) + case CacheServerTypeOnMemory: + } + if err := cacheServer.SetTimeout(r.opt.timeout); err != nil { + return xerrors.Errorf("failed to set timeout for cache server: %w", err) + } + if err := cacheServer.SetMaxIdleConnections(r.opt.maxIdleConnections); err != nil { + return xerrors.Errorf("failed to set max idle connections for cache server: %w", err) + } + lastLevelCheServer := NewLastLevelCache(cacheServer, r.opt.llcOpt) + lastLevelCheServer.opt.expiration = tagOption.expiration + lastLevelCheServer.opt.lockExpiration = tagOption.lockExpiration + r.lastLevelCaches.set(tagName, lastLevelCheServer) + } + } return nil } @@ -968,6 +1106,7 @@ func New(opts ...OptionFunc) (*Rapidash, error) { ignoreCaches: map[string]struct{}{}, firstLevelCaches: NewFirstLevelCacheMap(), secondLevelCaches: NewSecondLevelCacheMap(), + lastLevelCaches: NewLastLevelCacheMap(), opt: defaultOption(), } for _, opt := range opts { diff --git a/rapidash_test.go b/rapidash_test.go index b915e2c..3475aeb 100644 --- a/rapidash_test.go +++ b/rapidash_test.go @@ -11,7 +11,7 @@ import ( func TestServerChanging(t *testing.T) { t.Run("remove and add server", func(t *testing.T) { - cache, err := New(ServerAddrs([]string{"localhost:11211"}), MaxIdleConnections(1000), Timeout(200*time.Millisecond)) + cache, err := New(Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), MaxIdleConnections(1000), Timeout(200*time.Millisecond)) NoError(t, err) tx, err := cache.Begin() NoErrorf(t, err, "cannot begin cache transaction") @@ -24,7 +24,7 @@ func TestServerChanging(t *testing.T) { }) t.Run("remove and add only slc server", func(t *testing.T) { - cache, err := New(ServerAddrs([]string{"localhost:11211"}), MaxIdleConnections(1000), Timeout(200*time.Millisecond)) + cache, err := New(Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), MaxIdleConnections(1000), Timeout(200*time.Millisecond)) NoError(t, err) NoError(t, cache.WarmUp(conn, userLoginType(), false)) tx, err := cache.Begin(conn) @@ -43,7 +43,7 @@ func TestServerChanging(t *testing.T) { }) t.Run("remove and add only llc server", func(t *testing.T) { - cache, err := New(ServerAddrs([]string{"localhost:11211"}), MaxIdleConnections(1000), Timeout(200000000000)) + cache, err := New(Servers(ServersConfig{Type: CacheServerTypeMemcached, Addrs: []string{"localhost:11211"}}), MaxIdleConnections(1000), Timeout(200000000000)) NoError(t, err) tx, err := cache.Begin() NoErrorf(t, err, "cannot begin cache transaction") diff --git a/second_level_cache.go b/second_level_cache.go index dfc2fc2..bb086c2 100644 --- a/second_level_cache.go +++ b/second_level_cache.go @@ -31,6 +31,27 @@ func (c *SecondLevelCacheMap) get(tableName string) (*SecondLevelCache, bool) { return cache.(*SecondLevelCache), exists } +func (c *SecondLevelCacheMap) keys() []string { + if c.length() != 0 { + keys := make([]string, c.length()) + c.Range(func(key, value interface{}) bool { + keys = append(keys, key.(string)) + return true + }) + return keys + } + return []string{} +} + +func (c *SecondLevelCacheMap) length() uint64 { + len := 0 + c.Range(func(key, value interface{}) bool { + len++ + return true + }) + return uint64(len) +} + func NewSecondLevelCacheMap() *SecondLevelCacheMap { return &SecondLevelCacheMap{&sync.Map{}} } @@ -246,7 +267,13 @@ func (c *SecondLevelCache) lockKey(tx *Tx, key server.CacheKey) error { } lockKey := key.LockKey() log.Add(tx.id, lockKey, value) - if err := c.cacheServer.Add(lockKey, bytes, c.opt.LockExpiration()); err != nil { + var cacheServer server.CacheServer + if secondLevelCache, exists := tx.r.secondLevelCaches.get(c.typ.tableName); exists { + cacheServer = secondLevelCache.cacheServer + } else { + cacheServer = c.cacheServer + } + if err := cacheServer.Add(lockKey, bytes, c.opt.LockExpiration()); err != nil { content, getErr := c.cacheServer.Get(lockKey) if IsCacheMiss(getErr) { return xerrors.Errorf("fatal error. cannot add transaction key. but transaction key doesn't exist: %w", err) @@ -262,7 +289,11 @@ func (c *SecondLevelCache) lockKey(tx *Tx, key server.CacheKey) error { } return xerrors.Errorf("lock key (%s) is already added. value is %s: %w", lockKey, value, err) } - tx.lockKeys = append(tx.lockKeys, lockKey) + if _, exists := tx.secondLevelCacheLockKey.lockKeys[c.typ.tableName]; exists { + tx.secondLevelCacheLockKey.lockKeys[c.typ.tableName] = append(tx.secondLevelCacheLockKey.lockKeys[c.typ.tableName], lockKey) + } else { + tx.secondLevelCacheLockKey.lockKeys[c.typ.tableName] = []server.CacheKey{lockKey} + } return nil } diff --git a/server/memcache.go b/server/memcache.go index 5e16293..168f72d 100644 --- a/server/memcache.go +++ b/server/memcache.go @@ -205,11 +205,15 @@ func (c *MemcachedClient) onItem(item *Item, fn func(*MemcachedClient, *bufio.Re } func (c *MemcachedClient) FlushAll() error { - if err := c.client.slcSelector.Each(c.flushAllFromAddr); err != nil { - return err + if c.client.slcSelector != nil { + if err := c.client.slcSelector.Each(c.flushAllFromAddr); err != nil { + return err + } } - if err := c.client.llcSelector.Each(c.flushAllFromAddr); err != nil { - return err + if c.client.llcSelector != nil { + if err := c.client.llcSelector.Each(c.flushAllFromAddr); err != nil { + return err + } } return nil } diff --git a/server/redis.go b/server/redis.go index e74f891..c1bcd37 100644 --- a/server/redis.go +++ b/server/redis.go @@ -128,12 +128,17 @@ func (c *RedisClient) Delete(key CacheKey) error { } func (c *RedisClient) Flush() error { - if err := c.client.slcSelector.Each(c.flushAllFromAddr); err != nil { - return xerrors.Errorf("failed to flush second level cache: %w", err) + if c.client.slcSelector != nil { + if err := c.client.slcSelector.Each(c.flushAllFromAddr); err != nil { + return xerrors.Errorf("failed to flush second level cache: %w", err) + } } - if err := c.client.llcSelector.Each(c.flushAllFromAddr); err != nil { - return xerrors.Errorf("failed to flush last level cache: %w", err) + if c.client.llcSelector != nil { + if err := c.client.llcSelector.Each(c.flushAllFromAddr); err != nil { + return xerrors.Errorf("failed to flush last level cache: %w", err) + } + } return nil diff --git a/testdata/cache.yml b/testdata/cache.yml index bcac71f..331dd0a 100644 --- a/testdata/cache.yml +++ b/testdata/cache.yml @@ -1,4 +1,8 @@ rule: + servers: + type: 0 + addrs: + - localhost:11211 logger: mode: console enabled: true @@ -11,33 +15,3 @@ rule: ignore_newer_cache: true timeout: 200000000000 max_idle_connection: 1000 -slc: - servers: - - localhost:11211 - tables: - user_logins: - shard_key: user_id - cache_control: - optimistic_lock: false - pessimistic_lock: false - user_items: - server: localhost:11211 - lock_expiration: 30 - expiration: 100 - lock_expiration: 90 - expiration: 0 -llc: - servers: - - localhost:11211 - tags: - a: - server: localhost:11211 - b: - server: localhost:11211 - lock_expiration: 90 - expiration: 0 - cache_control: - optimistic_lock: false - pessimistic_lock: false - lock_expiration: 90 - expiration: 3600 diff --git a/testdata/cache_use_slc_option_and_llc_option.yml b/testdata/cache_use_slc_option_and_llc_option.yml new file mode 100644 index 0000000..e059fac --- /dev/null +++ b/testdata/cache_use_slc_option_and_llc_option.yml @@ -0,0 +1,34 @@ +rule: + servers: + type: 0 + addrs: + - localhost:11211 + logger: + mode: console + enabled: true + retry: + limit: 3 + interval: 30000 + cache_control: + optimistic_lock: true + pessimistic_lock: true + ignore_newer_cache: true + timeout: 200000000000 + max_idle_connection: 1000 +slc: + servers: + type: 0 + addrs: + - localhost:11211 + lock_expiration: 90 + expiration: 0 +llc: + servers: + type: 0 + addrs: + - localhost:11211 + cache_control: + optimistic_lock: false + pessimistic_lock: false + lock_expiration: 90 + expiration: 3600 diff --git a/testdata/cache_use_slc_table_option_and_llc_tag_option.yml b/testdata/cache_use_slc_table_option_and_llc_tag_option.yml new file mode 100644 index 0000000..d875d7b --- /dev/null +++ b/testdata/cache_use_slc_table_option_and_llc_tag_option.yml @@ -0,0 +1,63 @@ +rule: + servers: + type: 0 + addrs: + - localhost:11211 + logger: + mode: console + enabled: true + retry: + limit: 3 + interval: 30000 + cache_control: + optimistic_lock: true + pessimistic_lock: true + ignore_newer_cache: true + timeout: 200000000000 + max_idle_connection: 1000 +slc: + servers: + type: 0 + addrs: + - localhost:11211 + tables: + user_logins: + server: + type: 1 + addr: localhost:6379 + shard_key: user_id + cache_control: + optimistic_lock: false + pessimistic_lock: true + user_logs: + server: + type: 0 + addr: localhost:11211 + lock_expiration: 30 + expiration: 100 + cache_control: + optimistic_lock: false + pessimistic_lock: true + lock_expiration: 90 + expiration: 0 +llc: + servers: + type: 0 + addrs: + - localhost:11211 + tags: + a: + server: + type: 1 + addr: localhost:6379 + b: + server: + type: 0 + addr: localhost:11211 + lock_expiration: 90 + expiration: 0 + cache_control: + optimistic_lock: false + pessimistic_lock: false + lock_expiration: 90 + expiration: 3600 diff --git a/tx_test.go b/tx_test.go index 7f6187f..faf1b43 100644 --- a/tx_test.go +++ b/tx_test.go @@ -249,8 +249,7 @@ func initCache(conn *sql.DB, typ CacheServerType) error { } var err error cache, err = New( - ServerType(typ), - ServerAddrs(serverAddrs), + Servers(ServersConfig{Type: typ, Addrs: serverAddrs}), LogMode(LogModeJSON), LogEnabled(true), )