Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions rapidash.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,33 @@ func (tx *Tx) DeleteByQueryBuilderContext(ctx context.Context, builder *QueryBui
return xerrors.Errorf("unknown table name %s", builder.tableName)
}

func (tx *Tx) DeleteCacheByQueryBuilder(builder *QueryBuilder) error {
if err := tx.DeleteCacheByQueryBuilderContext(context.Background(), builder); err != nil {
return xerrors.Errorf("failed to DeleteCacheByQueryBuilderContext: %w", err)
}
return nil
}

func (tx *Tx) DeleteCacheByQueryBuilderContext(ctx context.Context, builder *QueryBuilder) error {
if tx.IsCommitted() {
return ErrAlreadyCommittedTransaction
}
tx.enabledIgnoreCacheIfExistsTable(builder)
if _, exists := tx.r.firstLevelCaches.get(builder.tableName); exists {
return xerrors.Errorf("%s is read only table. it doesn't support write query", builder.tableName)
}
if c, exists := tx.r.secondLevelCaches.get(builder.tableName); exists {
if tx.conn == nil {
return ErrConnectionOfTransaction
}
if err := c.DeleteCacheByQueryBuilder(ctx, tx, builder); err != nil {
return xerrors.Errorf("failed to DeleteCacheByQueryBuilder: %w", err)
}
return nil
}
return xerrors.Errorf("unknown table name %s", builder.tableName)
}

func (tx *Tx) IsCommitted() bool {
return tx.isDBCommitted || tx.isCacheCommitted
}
Expand Down
31 changes: 31 additions & 0 deletions second_level_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,37 @@ func (c *SecondLevelCache) DeleteByQueryBuilder(ctx context.Context, tx *Tx, bui
return nil
}

func (c *SecondLevelCache) DeleteCacheByQueryBuilder(ctx context.Context, tx *Tx, builder *QueryBuilder) error {
defer builder.Release()
if !builder.AvailableCache() {
if !builder.isIgnoreCache {
if err := c.deleteCacheFromSQL(ctx, tx, builder); err != nil {
return xerrors.Errorf("failed to delete cache by SQL: %w", err)
}
}
return nil
}

queries, err := builder.BuildWithIndex(c.valueFactory, c.indexes, c.typ)
if err != nil {
return xerrors.Errorf("failed to build query: %w", err)
}

if !c.isUsedPrimaryKeyBuilder(queries) {
if err := c.deleteCacheFromSQL(ctx, tx, builder); err != nil {
return xerrors.Errorf("failed to delete cache by SQL: %w", err)
}
} else {
for i := 0; i < queries.Len(); i++ {
cacheKey := queries.At(i).cacheKey
if err := c.deletePrimaryKey(tx, cacheKey); err != nil {
return xerrors.Errorf("failed to delete primary key: %w", err)
}
}
}
return nil
}

func (c *SecondLevelCache) builderByValue(value *StructValue, index *Index) *QueryBuilder {
builder := NewQueryBuilder(c.typ.tableName)
for _, column := range index.Columns {
Expand Down
56 changes: 56 additions & 0 deletions second_level_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,62 @@ func testDeleteByQueryBuilder(t *testing.T, typ CacheServerType) {

}

func TestDeleteCacheByQueryBuilder(t *testing.T) {
for cacheServerType := range []CacheServerType{CacheServerTypeMemcached, CacheServerTypeRedis} {
testDeleteCascheByQueryBuilder(t, CacheServerType(cacheServerType))
}
}

func testDeleteCascheByQueryBuilder(t *testing.T, typ CacheServerType) {
NoError(t, initCache(conn, typ))
slc := NewSecondLevelCache(userLoginType(), cache.cacheServer, TableOption{})
NoError(t, slc.WarmUp(conn))
t.Run("cache is available", func(t *testing.T) {
NoError(t, initUserLoginTable(conn))
builder := NewQueryBuilder("user_logins").
In("user_id", []uint64{1, 2, 3, 4, 5}).
Eq("user_session_id", uint64(1))
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
NoError(t, slc.DeleteCacheByQueryBuilder(context.Background(), tx, builder))
NoError(t, tx.Commit())
})

t.Run("not available cache", func(t *testing.T) {
NoError(t, initUserLoginTable(conn))
builder := NewQueryBuilder("user_logins").
Gte("user_session_id", uint64(1)).
Lte("user_session_id", uint64(3))
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
NoError(t, slc.DeleteCacheByQueryBuilder(context.Background(), tx, builder))
NoError(t, tx.Commit())
})

t.Run("delete by primary keys", func(t *testing.T) {
NoError(t, initUserLoginTable(conn))
builder := NewQueryBuilder("user_logins").
In("id", []uint64{1, 2, 3, 4, 5})
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
NoError(t, slc.DeleteCacheByQueryBuilder(context.Background(), tx, builder))

var userLogins UserLogins
NoError(t, slc.FindByQueryBuilder(context.Background(), tx, builder, &userLogins))
if len(userLogins) != 0 {
t.Fatal("fail to delete")
}
NoError(t, tx.Commit())
})

}

func TestRawQuery(t *testing.T) {
for cacheServerType := range []CacheServerType{CacheServerTypeMemcached, CacheServerTypeRedis} {
testRawQuery(t, CacheServerType(cacheServerType))
Expand Down
99 changes: 99 additions & 0 deletions tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,102 @@ func TestTx_DeleteByQueryBuilderContext(t *testing.T) {
}
})
}

func TestTx_DeleteCacheByQueryBuilder(t *testing.T) {
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()

findBuilder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(1))
var userLogin UserLogin
NoError(t, tx.FindByQueryBuilder(findBuilder, &userLogin))
NotEqualf(t, userLogin.ID, 0, "cannot find userLogin")

builder := NewQueryBuilder("user_logins").Eq("id", userLogin.ID)
NoError(t, tx.DeleteCacheByQueryBuilder(builder))
NoError(t, tx.Commit())
}

func TestTx_DeleteCacheByQueryBuilderContext(t *testing.T) {
t.Run("already committed", func(t *testing.T) {
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()
NoError(t, tx.Commit())

builder := NewQueryBuilder("user_logins").Eq("id", uint64(1))
if err := tx.DeleteByQueryBuilderContext(context.Background(), builder); err != nil {
if !xerrors.Is(err, ErrAlreadyCommittedTransaction) {
t.Fatalf("unexpected type err: %+v", err)
}
} else {
t.Fatal("required not nil error")
}
})
t.Run("update flc table", func(t *testing.T) {
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()

builder := NewQueryBuilder("events").Eq("id", uint64(1))
var event Event
NoError(t, tx.FindByQueryBuilder(builder, &event))
NotEqualf(t, event.ID, 0, "cannot find event")

if err := tx.DeleteCacheByQueryBuilderContext(context.Background(), builder); err == nil {
t.Fatalf("err is nil")
}
})
t.Run("conn is nil", func(t *testing.T) {
tx, err := cache.Begin(nil)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()

builder := NewQueryBuilder("user_logins").Eq("id", uint64(1))
if err := tx.DeleteCacheByQueryBuilderContext(context.Background(), builder); err != nil {
if !xerrors.Is(err, ErrConnectionOfTransaction) {
t.Fatalf("unexpected type err: %+v", err)
}
} else {
t.Fatal("required not nil error")
}
})
t.Run("update slc table", func(t *testing.T) {
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()

findBuilder := NewQueryBuilder("user_logins").
Eq("user_id", uint64(1)).
Eq("user_session_id", uint64(1))
var userLogin UserLogin
NoError(t, tx.FindByQueryBuilder(findBuilder, &userLogin))
NotEqualf(t, userLogin.ID, 0, "cannot find userLogin")

builder := NewQueryBuilder("user_logins").Eq("id", userLogin.ID)
NoError(t, tx.DeleteCacheByQueryBuilderContext(context.Background(), builder))
NoError(t, tx.Commit())
})
t.Run("update unknown table", func(t *testing.T) {
txConn, err := conn.Begin()
NoError(t, err)
tx, err := cache.Begin(txConn)
NoError(t, err)
defer func() { NoError(t, tx.RollbackUnlessCommitted()) }()

builder := NewQueryBuilder("rapidash").Eq("id", uint64(1))
if err := tx.DeleteCacheByQueryBuilderContext(context.Background(), builder); err == nil {
t.Fatalf("err is nil")
}
})
}