diff --git a/internal/app/query/cache.go b/internal/app/query/cache.go index c2ab8fae7..9843b9deb 100644 --- a/internal/app/query/cache.go +++ b/internal/app/query/cache.go @@ -176,6 +176,25 @@ func updateTicketCache(store statestore.Service, value interface{}) error { tickets[t.Id] = t } + // do not block current run + go func() { + // thought: after cache update, the num of tickets in cache should match the currentAll + // therefore anything in currentAll but not in tickets is likely a ticket + // who's ttl has expired and has been deleted but remains in index + expiredTicketIds := []string{} + for id := range currentAll { + if _, ok := tickets[id]; !ok { + expiredTicketIds = append(expiredTicketIds, id) + } + } + + // delete from index cache + err = store.DeindexTickets(context.Background(), expiredTicketIds) + if err != nil { + logger.Errorf("Error deindexing orphaned tickets: %v", err) + } + }() + stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount))) stats.Record(context.Background(), totalActiveTickets.M(int64(len(currentAll)))) stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch)))) diff --git a/internal/statestore/backfill_test.go b/internal/statestore/backfill_test.go index 91aaad1f5..d27ea0dd6 100644 --- a/internal/statestore/backfill_test.go +++ b/internal/statestore/backfill_test.go @@ -34,7 +34,7 @@ import ( ) func TestCreateBackfillLastAckTime(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -56,7 +56,7 @@ func TestCreateBackfillLastAckTime(t *testing.T) { } func TestCreateBackfill(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -118,7 +118,7 @@ func TestCreateBackfill(t *testing.T) { } func TestUpdateExistingBackfillNoError(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -178,7 +178,7 @@ func TestUpdateExistingBackfillNoError(t *testing.T) { } func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -208,7 +208,7 @@ func TestUpdateBackfillDoNotExistCanNotUpdate(t *testing.T) { } func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -236,7 +236,7 @@ func TestUpdateBackfillExpiredBackfillErrExpected(t *testing.T) { } func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -254,7 +254,7 @@ func TestUpdateBackfillExpiredContextErrExpected(t *testing.T) { } func TestGetBackfill(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -341,7 +341,7 @@ func TestGetBackfill(t *testing.T) { } func TestDeleteBackfill(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -424,7 +424,7 @@ func TestDeleteBackfill(t *testing.T) { // TestUpdateAcknowledgmentTimestampLifecycle test statestore functions - UpdateAcknowledgmentTimestamp, GetExpiredBackfillIDs // and deleteExpiredBackfillID func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) @@ -480,7 +480,7 @@ func TestUpdateAcknowledgmentTimestampLifecycle(t *testing.T) { } func TestUpdateAcknowledgmentTimestamp(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() startTime := time.Now() @@ -511,7 +511,7 @@ func TestUpdateAcknowledgmentTimestamp(t *testing.T) { } func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -535,7 +535,7 @@ func TestUpdateAcknowledgmentTimestamptExpiredBackfillErrExpected(t *testing.T) } func TestUpdateAcknowledgmentTimestampConnectionError(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -560,7 +560,7 @@ func createInvalidRedisConfig() config.View { // TestGetExpiredBackfillIDs test statestore function GetExpiredBackfillIDs func TestGetExpiredBackfillIDs(t *testing.T) { // Prepare expired and normal BackfillIds in a Redis Sorted Set - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() expID := "expired" @@ -584,7 +584,7 @@ func TestGetExpiredBackfillIDs(t *testing.T) { } func TestIndexBackfill(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -617,7 +617,7 @@ func TestIndexBackfill(t *testing.T) { } func TestDeindexBackfill(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -654,7 +654,7 @@ func TestDeindexBackfill(t *testing.T) { } func TestGetIndexedBackfills(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -705,7 +705,7 @@ func generateBackfills(ctx context.Context, t *testing.T, service Service, amoun func BenchmarkCleanupBackfills(b *testing.B) { t := &testing.T{} - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -743,7 +743,7 @@ func BenchmarkCleanupBackfills(b *testing.B) { } func TestCleanupBackfills(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) diff --git a/internal/statestore/redis_test.go b/internal/statestore/redis_test.go index 5fc7979f8..3830706ec 100644 --- a/internal/statestore/redis_test.go +++ b/internal/statestore/redis_test.go @@ -9,7 +9,7 @@ import ( ) func TestNewMutex(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) diff --git a/internal/statestore/ticket.go b/internal/statestore/ticket.go index dcdd86b84..7de7bbf1d 100644 --- a/internal/statestore/ticket.go +++ b/internal/statestore/ticket.go @@ -53,7 +53,8 @@ func (rb *redisBackend) CreateTicket(ctx context.Context, ticket *pb.Ticket) err return status.Errorf(codes.Internal, "failed to marshal the ticket proto, id: %s: proto: Marshal called with nil", ticket.GetId()) } - _, err = redisConn.Do("SET", ticket.GetId(), value) + ticketDeletionTTL := getTicketDeleteTimeout(rb.cfg) / time.Millisecond + _, err = redisConn.Do("SET", ticket.GetId(), value, "PX", int64(ticketDeletionTTL)) if err != nil { err = errors.Wrapf(err, "failed to set the value for ticket, id: %s", ticket.GetId()) return status.Errorf(codes.Internal, "%v", err) @@ -533,3 +534,18 @@ func getAssignedDeleteTimeout(cfg config.View) time.Duration { return cfg.GetDuration(name) } + +func getTicketDeleteTimeout(cfg config.View) time.Duration { + const ( + name = "ticketDeleteTimeout" + // Default timeout to delete tickets after creation. This value + // will be used if ticketDeleteTimeout is not configured. + defaultTicketDeleteTimeout = 1 * time.Hour + ) + + if !cfg.IsSet(name) { + return defaultTicketDeleteTimeout + } + + return cfg.GetDuration(name) +} diff --git a/internal/statestore/ticket_test.go b/internal/statestore/ticket_test.go index 89a93e844..8115b041b 100644 --- a/internal/statestore/ticket_test.go +++ b/internal/statestore/ticket_test.go @@ -28,6 +28,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/rs/xid" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -38,7 +39,7 @@ import ( ) func TestStatestoreSetup(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -46,7 +47,7 @@ func TestStatestoreSetup(t *testing.T) { } func TestTicketLifecycle(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -104,7 +105,7 @@ func TestTicketLifecycle(t *testing.T) { } func TestGetAssignmentBeforeSet(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -123,7 +124,7 @@ func TestGetAssignmentBeforeSet(t *testing.T) { } func TestGetAssignmentNormal(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -171,7 +172,7 @@ func TestGetAssignmentNormal(t *testing.T) { } func TestUpdateAssignments(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -376,7 +377,7 @@ func TestConnect(t *testing.T) { } func TestHealthCheck(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -395,7 +396,7 @@ func TestHealthCheck(t *testing.T) { } func TestCreateTicket(t *testing.T) { - cfg, closer := createRedis(t, true, "") + cfg, closer, _ := createRedis(t, true, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -453,7 +454,7 @@ func TestCreateTicket(t *testing.T) { } func TestGetTicket(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -530,8 +531,51 @@ func TestGetTicket(t *testing.T) { require.Contains(t, status.Convert(err).Message(), "GetTicket, id: 12345, failed to connect to redis:") } +func TestGetTicketWithTTL(t *testing.T) { + cfg, closer, fastForward := createRedis(t, false, "") + defer closer() + service := New(cfg) + + require.NotNil(t, service) + defer service.Close() + ctx := utilTesting.NewContext(t) + ticketId := "mockTicketID" + + // Given + err := service.CreateTicket(ctx, &pb.Ticket{ + Id: ticketId, + Assignment: &pb.Assignment{Connection: "2"}, + }) + require.NoError(t, err) + + // When + ticketActual, errActual := service.GetTicket(ctx, ticketId) + + // Then + require.NoError(t, errActual) + require.NotNil(t, ticketActual) + + // And on TTL Expiry - default TTL for tests is a second + fastForward(2 * time.Second) + ticket, err := service.GetTicket(ctx, ticketId) + + require.Error(t, err) + require.Equal(t, codes.NotFound.String(), status.Convert(err).Code().String()) + assert.Nil(t, ticket) + + // pass an expired context, err expected + ctx, cancel := context.WithCancel(context.Background()) + cancel() + service = New(cfg) + res, err := service.GetTicket(ctx, ticketId) + require.Error(t, err) + require.Nil(t, res) + require.Equal(t, codes.Unavailable.String(), status.Convert(err).Code().String()) + require.Contains(t, status.Convert(err).Message(), "GetTicket, id: mockTicketID, failed to connect to redis:") +} + func TestDeleteTicket(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -600,7 +644,7 @@ func TestDeleteTicket(t *testing.T) { } func TestDeleteTickets(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -669,7 +713,7 @@ func TestDeleteTickets(t *testing.T) { } func TestIndexTicket(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -701,7 +745,7 @@ func TestIndexTicket(t *testing.T) { } func TestDeindexTicket(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -738,7 +782,7 @@ func TestDeindexTicket(t *testing.T) { } func TestDeindexTickets(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -775,7 +819,7 @@ func TestDeindexTickets(t *testing.T) { } func TestGetIndexedIDSet(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -820,7 +864,7 @@ func TestGetIndexedIDSet(t *testing.T) { } func TestGetTickets(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -853,7 +897,7 @@ func TestGetTickets(t *testing.T) { } func TestDeleteTicketsFromPendingRelease(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -904,7 +948,7 @@ func TestDeleteTicketsFromPendingRelease(t *testing.T) { } func TestReleaseAllTickets(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -951,7 +995,7 @@ func TestReleaseAllTickets(t *testing.T) { } func TestAddTicketsToPendingRelease(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -995,7 +1039,7 @@ func TestAddTicketsToPendingRelease(t *testing.T) { } func TestGetIndexedTicketCount(t *testing.T) { - cfg, closer := createRedis(t, false, "") + cfg, closer, _ := createRedis(t, false, "") defer closer() service := New(cfg) require.NotNil(t, service) @@ -1028,7 +1072,7 @@ func TestGetIndexedTicketCount(t *testing.T) { } func testConnect(t *testing.T, withSentinel bool, withPassword string) { - cfg, closer := createRedis(t, withSentinel, withPassword) + cfg, closer, _ := createRedis(t, withSentinel, withPassword) defer closer() store := New(cfg) defer store.Close() @@ -1048,7 +1092,7 @@ func testConnect(t *testing.T, withSentinel bool, withPassword string) { require.Equal(t, "PONG", rply) } -func createRedis(t *testing.T, withSentinel bool, withPassword string) (config.View, func()) { +func createRedis(t *testing.T, withSentinel bool, withPassword string) (config.View, func(), func(time.Duration)) { cfg := viper.New() closerFuncs := []func(){} mredis := miniredis.NewMiniRedis() @@ -1071,6 +1115,7 @@ func createRedis(t *testing.T, withSentinel bool, withPassword string) (config.V cfg.Set("backoff.maxElapsedTime", 100*time.Millisecond) cfg.Set(telemetry.ConfigNameEnableMetrics, true) cfg.Set("assignedDeleteTimeout", 1000*time.Millisecond) + cfg.Set("ticketDeleteTimeout", time.Second) if withSentinel { s := minisentinel.NewSentinel(mredis) @@ -1110,7 +1155,7 @@ func createRedis(t *testing.T, withSentinel bool, withPassword string) (config.V for _, closer := range closerFuncs { closer() } - } + }, mredis.FastForward } // nolint: unparam