Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Mark webapi task failure as retry limit exceeded #392

Merged
merged 6 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions go/tasks/pluginmachinery/internal/webapi/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,38 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]",
resource.GetID())

if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
logger.Infof(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
cacheItem.State.Phase = PhaseSystemFailure
}

if cacheItem.State.Phase.IsTerminal() {
logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]",
resource.GetID())

resp = append(resp, cache.ItemSyncResponse{
ID: resource.GetID(),
Item: resource.GetItem(),
Item: cacheItem,
Action: cache.Unchanged,
})

continue
}

if cacheItem.SyncFailureCount > q.cfg.MaxSystemFailures {
logger.Debugf(ctx, "Sync loop - Item with key [%v] has failed to sync [%v] time(s). More than the allowed [%v] time(s). Marking as failure.",
cacheItem.SyncFailureCount, q.cfg.MaxSystemFailures)
cacheItem.State.Phase = PhaseSystemFailure
resp = append(resp, cache.ItemSyncResponse{
ID: resource.GetID(),
Item: cacheItem,
Action: cache.Update,
})

continue
}

// Get an updated status
logger.Debugf(ctx, "Querying AsyncPlugin for %s", resource.GetID())
newResource, err := q.client.Get(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", nil))
if err != nil {
logger.Infof(ctx, "Error retrieving resource [%s]. Error: %v", resource.GetID(), err)
cacheItem.SyncFailureCount++
cacheItem.ErrorMessage = err.Error()

// Make sure we don't return nil for the first argument, because that deletes it from the cache.
resp = append(resp, cache.ItemSyncResponse{
Expand Down
30 changes: 30 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,36 @@ func TestResourceCache_SyncResource(t *testing.T) {
assert.Equal(t, cacheItem, newCacheItem[0].Item)
})

t.Run("Retry limit exceeded", func(t *testing.T) {
mockCache := &cacheMocks.AutoRefresh{}
mockClient := &mocks.Client{}

q := ResourceCache{
AutoRefresh: mockCache,
client: mockClient,
cfg: webapi.CachingConfig{
MaxSystemFailures: 2,
},
}

cacheItem := CacheItem{
State: State{
SyncFailureCount: 5,
ErrorMessage: "some error",
},
}

iw := &cacheMocks.ItemWrapper{}
iw.OnGetItem().Return(cacheItem)
iw.OnGetID().Return("some-id")

newCacheItem, err := q.SyncResource(ctx, []cache.ItemWrapper{iw})
assert.NoError(t, err)
assert.Equal(t, cache.Update, newCacheItem[0].Action)
cacheItem.State.Phase = PhaseSystemFailure
assert.Equal(t, cacheItem, newCacheItem[0].Item)
})

t.Run("move to success", func(t *testing.T) {
mockCache := &cacheMocks.AutoRefresh{}
mockClient := &mocks.Client{}
Expand Down
12 changes: 10 additions & 2 deletions go/tasks/pluginmachinery/internal/webapi/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@
errors.CacheFailed, "Failed to cast [%v]", cacheItem)
}

// If the cache has not syncd yet, just return
// If the cache has not synced yet, just return
if cacheItem.Resource == nil {
if cacheItem.Phase.IsTerminal() {
err = cache.DeleteDelayed(cacheItemID)
if err != nil {
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
cacheItemID, err)
}
return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil

Check warning on line 40 in go/tasks/pluginmachinery/internal/webapi/monitor.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/pluginmachinery/internal/webapi/monitor.go#L34-L40

Added lines #L34 - L40 were not covered by tests
}
return state, core.PhaseInfoRunning(0, nil), nil
}

Expand All @@ -54,7 +62,7 @@
// Queue item for deletion in the cache.
err = cache.DeleteDelayed(cacheItemID)
if err != nil {
logger.Warnf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",
logger.Errorf(ctx, "Failed to queue item for deletion in the cache with Item Id: [%v]. Error: %v",

Check warning on line 65 in go/tasks/pluginmachinery/internal/webapi/monitor.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/pluginmachinery/internal/webapi/monitor.go#L65

Added line #L65 was not covered by tests
cacheItemID, err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/internal/webapi/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ type State struct {

// The time the execution first requests for an allocation token
AllocationTokenRequestStartTime time.Time `json:"allocationTokenRequestStartTime,omitempty"`

// ErrorMessage generated during cache synchronization.
ErrorMessage string `json:"error_message,omitempty"`
}
Loading