diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 9a94c8f5536..e48951419d8 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -159,33 +159,24 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler SetScope(*decisionItem.Scope). SetOrigin(*decisionItem.Origin). SetSimulated(*alertItem.Simulated). - SetUUID(decisionItem.UUID) + SetUUID(decisionItem.UUID). + SetOwnerID(foundAlert.ID) decisionBuilders = append(decisionBuilders, decisionBuilder) } - decisions := []*ent.Decision{} - - builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize) - - for _, builderChunk := range builderChunks { - decisionsCreateRet, err := c.Ent.Decision.CreateBulk(builderChunk...).Save(ctx) - if err != nil { - return "", fmt.Errorf("creating alert decisions: %w", err) - } - - decisions = append(decisions, decisionsCreateRet...) - } - - // now that we bulk created missing decisions, let's update the alert - - decisionChunks := slicetools.Chunks(decisions, c.decisionBulkSize) + // create missing decisions in batches - for _, decisionChunk := range decisionChunks { - err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisionChunk...).Exec(ctx) + decisions := make([]*ent.Decision, 0, len(decisionBuilders)) + if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error { + ret, err := c.Ent.Decision.CreateBulk(b...).Save(ctx) if err != nil { - return "", fmt.Errorf("updating alert %s: %w", alertItem.UUID, err) + return fmt.Errorf("creating alert decisions: %w", err) } + decisions = append(decisions, ret...) + return nil + }); err != nil { + return "", err } return "", nil @@ -329,32 +320,35 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models valueList = append(valueList, *decisionItem.Value) } - deleteChunks := slicetools.Chunks(valueList, c.decisionBulkSize) + // Delete older decisions from capi - for _, deleteChunk := range deleteChunks { - // Deleting older decisions from capi + if err := slicetools.Batch(ctx, valueList, c.decisionBulkSize, func(ctx context.Context, vals []string) error { deletedDecisions, err := txClient.Decision.Delete(). Where(decision.And( decision.OriginEQ(decOrigin), decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))), - decision.ValueIn(deleteChunk...), - )).Exec(ctx) + decision.ValueIn(vals...), + )).Exec(ctx) if err != nil { - return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions") + return err } - deleted += deletedDecisions + return nil + }); err != nil { + return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions") } - builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize) + // Insert new decisions - for _, builderChunk := range builderChunks { - insertedDecisions, err := txClient.Decision.CreateBulk(builderChunk...).Save(ctx) + if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error { + insertedDecisions, err := txClient.Decision.CreateBulk(b...).Save(ctx) if err != nil { - return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions") + return err } - inserted += len(insertedDecisions) + return nil + }); err != nil { + return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions") } log.Debugf("deleted %d decisions for %s vs %s", deleted, decOrigin, *alertItem.Decisions[0].Origin) @@ -367,7 +361,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models return alertRef.ID, inserted, deleted, nil } -func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) { +func (c *Client) createDecisionBatch(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) { decisionCreate := []*ent.DecisionCreate{} for _, decisionItem := range decisions { @@ -544,15 +538,15 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) { decisions := []*ent.Decision{} - - decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize) - for _, decisionChunk := range decisionChunks { - decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk) + if err := slicetools.Batch(ctx, alertItem.Decisions, client.decisionBulkSize, func(ctx context.Context, part []*models.Decision) error { + ret, err := client.createDecisionBatch(ctx, *alertItem.Simulated, stopAtTime, part) if err != nil { - return nil, 0, fmt.Errorf("creating alert decisions: %w", err) + return fmt.Errorf("creating alert decisions: %w", err) } - - decisions = append(decisions, decisionRet...) + decisions = append(decisions, ret...) + return nil + }); err != nil { + return nil, 0, err } discarded := len(alertItem.Decisions) - len(decisions) @@ -620,15 +614,13 @@ func saveAlerts(ctx context.Context, c *Client, batch []alertCreatePlan) ([]stri continue } - decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize) - - for _, d2 := range decisionsChunk { - if err := retryOnBusy(func() error { + if err := slicetools.Batch(ctx, d, c.decisionBulkSize, func(ctx context.Context, d2 []*ent.Decision) error { + return retryOnBusy(func() error { _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx) return err - }); err != nil { - return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err) - } + }) + }); err != nil { + return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err) } } @@ -640,7 +632,7 @@ type alertCreatePlan struct { decisions []*ent.Decision } -func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) { +func (c *Client) createAlertBatch(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) { batch := make([]alertCreatePlan, 0, len(alerts)) for _, alertItem := range alerts { @@ -740,16 +732,16 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList [] c.Log.Debugf("writing %d items", len(alertList)) - alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize) alertIDs := []string{} - - for _, alertChunk := range alertChunks { - ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk) + if err := slicetools.Batch(ctx, alertList, alertCreateBulkSize, func(ctx context.Context, part []*models.Alert) error { + ids, err := c.createAlertBatch(ctx, machineID, owner, part) if err != nil { - return nil, fmt.Errorf("machine '%s': %w", machineID, err) + return fmt.Errorf("machine %q: %w", machineID, err) } - alertIDs = append(alertIDs, ids...) + return nil + }); err != nil { + return nil, err } if owner != nil { diff --git a/pkg/database/decisions.go b/pkg/database/decisions.go index a9591d4b4a8..afbe92c9f27 100644 --- a/pkg/database/decisions.go +++ b/pkg/database/decisions.go @@ -298,68 +298,79 @@ func decisionIDs(decisions []*ent.Decision) []int { return ids } -// ExpireDecisions sets the expiration of a list of decisions to now() -// It returns the number of impacted decisions for the CAPI/PAPI -func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { - if len(decisions) <= decisionDeleteBulkSize { - ids := decisionIDs(decisions) +// expireDecisionBatch expires the decisions as a single operation. +func (c *Client) expireDecisionBatch(ctx context.Context, batch []*ent.Decision, now time.Time) (int, error) { + ids := decisionIDs(batch) + + rows, err := c.Ent.Decision. + Update(). + Where(decision.IDIn(ids...)). + SetUntil(now). + Save(ctx) + if err != nil { + return 0, fmt.Errorf("expire decisions with provided filter: %w", err) + } - rows, err := c.Ent.Decision.Update().Where( - decision.IDIn(ids...), - ).SetUntil(time.Now().UTC()).Save(ctx) - if err != nil { - return 0, fmt.Errorf("expire decisions with provided filter: %w", err) - } + return rows, nil +} - return rows, nil +// ExpireDecisions sets the expiration of a list of decisions to now(), +// in multiple operations if len(decisions) > decisionDeleteBulkSize. +// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error. +func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { + if len(decisions) == 0 { + return 0, nil } - // big batch, let's split it and recurse + now := time.Now().UTC() total := 0 - - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { - rows, err := c.ExpireDecisions(ctx, chunk) + err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error { + rows, err := c.expireDecisionBatch(ctx, batch, now) if err != nil { - return total, err + return err } - total += rows - } + return nil + }) - return total, nil + return total, err } -// DeleteDecisions removes a list of decisions from the database -// It returns the number of impacted decisions for the CAPI/PAPI -func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { - if len(decisions) < decisionDeleteBulkSize { - ids := decisionIDs(decisions) - - rows, err := c.Ent.Decision.Delete().Where( - decision.IDIn(ids...), - ).Exec(ctx) - if err != nil { - return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err) - } +// deleteDecisionBatch removes the decisions as a single operation. +func (c *Client) deleteDecisionBatch(ctx context.Context, batch []*ent.Decision) (int, error) { + ids := decisionIDs(batch) - return rows, nil + rows, err := c.Ent.Decision. + Delete(). + Where(decision.IDIn(ids...)). + Exec(ctx) + if err != nil { + return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err) } - // big batch, let's split it and recurse + return rows, nil +} - tot := 0 +// DeleteDecisions removes a list of decisions from the database, +// in multiple operations if len(decisions) > decisionDeleteBulkSize. +// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error. +func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) { + if len(decisions) == 0 { + return 0, nil + } - for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) { - rows, err := c.DeleteDecisions(ctx, chunk) + total := 0 + err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error { + rows, err := c.deleteDecisionBatch(ctx, batch) if err != nil { - return tot, err + return err } + total += rows + return nil + }) - tot += rows - } - - return tot, nil + return total, err } // ExpireDecisionByID set the expiration of a decision to now()