Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 46 additions & 54 deletions pkg/database/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,24 @@ func (c *Client) CreateOrUpdateAlert(ctx context.Context, machineID string, aler
SetScope(*decisionItem.Scope).
SetOrigin(*decisionItem.Origin).
SetSimulated(*alertItem.Simulated).
SetUUID(decisionItem.UUID)
SetUUID(decisionItem.UUID).
SetOwnerID(foundAlert.ID)

decisionBuilders = append(decisionBuilders, decisionBuilder)
}

decisions := []*ent.Decision{}

builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)

for _, builderChunk := range builderChunks {
decisionsCreateRet, err := c.Ent.Decision.CreateBulk(builderChunk...).Save(ctx)
if err != nil {
return "", fmt.Errorf("creating alert decisions: %w", err)
}

decisions = append(decisions, decisionsCreateRet...)
}

// now that we bulk created missing decisions, let's update the alert

decisionChunks := slicetools.Chunks(decisions, c.decisionBulkSize)
// create missing decisions in batches

for _, decisionChunk := range decisionChunks {
err = c.Ent.Alert.Update().Where(alert.UUID(alertItem.UUID)).AddDecisions(decisionChunk...).Exec(ctx)
decisions := make([]*ent.Decision, 0, len(decisionBuilders))
if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error {
ret, err := c.Ent.Decision.CreateBulk(b...).Save(ctx)
if err != nil {
return "", fmt.Errorf("updating alert %s: %w", alertItem.UUID, err)
return fmt.Errorf("creating alert decisions: %w", err)
}
decisions = append(decisions, ret...)
return nil
}); err != nil {
return "", err
}

return "", nil
Expand Down Expand Up @@ -329,32 +320,35 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
valueList = append(valueList, *decisionItem.Value)
}

deleteChunks := slicetools.Chunks(valueList, c.decisionBulkSize)
// Delete older decisions from capi

for _, deleteChunk := range deleteChunks {
// Deleting older decisions from capi
if err := slicetools.Batch(ctx, valueList, c.decisionBulkSize, func(ctx context.Context, vals []string) error {
deletedDecisions, err := txClient.Decision.Delete().
Where(decision.And(
decision.OriginEQ(decOrigin),
decision.Not(decision.HasOwnerWith(alert.IDEQ(alertRef.ID))),
decision.ValueIn(deleteChunk...),
)).Exec(ctx)
decision.ValueIn(vals...),
)).Exec(ctx)
if err != nil {
return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions")
return err
}

deleted += deletedDecisions
return nil
}); err != nil {
return 0, 0, 0, rollbackOnError(txClient, err, "deleting older community blocklist decisions")
}

builderChunks := slicetools.Chunks(decisionBuilders, c.decisionBulkSize)
// Insert new decisions

for _, builderChunk := range builderChunks {
insertedDecisions, err := txClient.Decision.CreateBulk(builderChunk...).Save(ctx)
if err := slicetools.Batch(ctx, decisionBuilders, c.decisionBulkSize, func(ctx context.Context, b []*ent.DecisionCreate) error {
insertedDecisions, err := txClient.Decision.CreateBulk(b...).Save(ctx)
if err != nil {
return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions")
return err
}

inserted += len(insertedDecisions)
return nil
}); err != nil {
return 0, 0, 0, rollbackOnError(txClient, err, "bulk creating decisions")
}

log.Debugf("deleted %d decisions for %s vs %s", deleted, decOrigin, *alertItem.Decisions[0].Origin)
Expand All @@ -367,7 +361,7 @@ func (c *Client) UpdateCommunityBlocklist(ctx context.Context, alertItem *models
return alertRef.ID, inserted, deleted, nil
}

func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
func (c *Client) createDecisionBatch(ctx context.Context, simulated bool, stopAtTime time.Time, decisions []*models.Decision) ([]*ent.Decision, error) {
decisionCreate := []*ent.DecisionCreate{}

for _, decisionItem := range decisions {
Expand Down Expand Up @@ -544,15 +538,15 @@ func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.C

func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) {
decisions := []*ent.Decision{}

decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize)
for _, decisionChunk := range decisionChunks {
decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
if err := slicetools.Batch(ctx, alertItem.Decisions, client.decisionBulkSize, func(ctx context.Context, part []*models.Decision) error {
ret, err := client.createDecisionBatch(ctx, *alertItem.Simulated, stopAtTime, part)
if err != nil {
return nil, 0, fmt.Errorf("creating alert decisions: %w", err)
return fmt.Errorf("creating alert decisions: %w", err)
}

decisions = append(decisions, decisionRet...)
decisions = append(decisions, ret...)
return nil
}); err != nil {
return nil, 0, err
}

discarded := len(alertItem.Decisions) - len(decisions)
Expand Down Expand Up @@ -620,15 +614,13 @@ func saveAlerts(ctx context.Context, c *Client, batch []alertCreatePlan) ([]stri
continue
}

decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)

for _, d2 := range decisionsChunk {
if err := retryOnBusy(func() error {
if err := slicetools.Batch(ctx, d, c.decisionBulkSize, func(ctx context.Context, d2 []*ent.Decision) error {
return retryOnBusy(func() error {
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
return err
}); err != nil {
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
}
})
}); err != nil {
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
}
}

Expand All @@ -640,7 +632,7 @@ type alertCreatePlan struct {
decisions []*ent.Decision
}

func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
func (c *Client) createAlertBatch(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
batch := make([]alertCreatePlan, 0, len(alerts))

for _, alertItem := range alerts {
Expand Down Expand Up @@ -740,16 +732,16 @@ func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []

c.Log.Debugf("writing %d items", len(alertList))

alertChunks := slicetools.Chunks(alertList, alertCreateBulkSize)
alertIDs := []string{}

for _, alertChunk := range alertChunks {
ids, err := c.createAlertChunk(ctx, machineID, owner, alertChunk)
if err := slicetools.Batch(ctx, alertList, alertCreateBulkSize, func(ctx context.Context, part []*models.Alert) error {
ids, err := c.createAlertBatch(ctx, machineID, owner, part)
if err != nil {
return nil, fmt.Errorf("machine '%s': %w", machineID, err)
return fmt.Errorf("machine %q: %w", machineID, err)
}

alertIDs = append(alertIDs, ids...)
return nil
}); err != nil {
return nil, err
}

if owner != nil {
Expand Down
95 changes: 53 additions & 42 deletions pkg/database/decisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,68 +298,79 @@ func decisionIDs(decisions []*ent.Decision) []int {
return ids
}

// ExpireDecisions sets the expiration of a list of decisions to now()
// It returns the number of impacted decisions for the CAPI/PAPI
func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
if len(decisions) <= decisionDeleteBulkSize {
ids := decisionIDs(decisions)
// expireDecisionBatch expires the decisions as a single operation.
func (c *Client) expireDecisionBatch(ctx context.Context, batch []*ent.Decision, now time.Time) (int, error) {
ids := decisionIDs(batch)

rows, err := c.Ent.Decision.
Update().
Where(decision.IDIn(ids...)).
SetUntil(now).
Save(ctx)
if err != nil {
return 0, fmt.Errorf("expire decisions with provided filter: %w", err)
}

rows, err := c.Ent.Decision.Update().Where(
decision.IDIn(ids...),
).SetUntil(time.Now().UTC()).Save(ctx)
if err != nil {
return 0, fmt.Errorf("expire decisions with provided filter: %w", err)
}
return rows, nil
}

return rows, nil
// ExpireDecisions sets the expiration of a list of decisions to now(),
// in multiple operations if len(decisions) > decisionDeleteBulkSize.
// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error.
func (c *Client) ExpireDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
if len(decisions) == 0 {
return 0, nil
}

// big batch, let's split it and recurse
now := time.Now().UTC()

total := 0

for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
rows, err := c.ExpireDecisions(ctx, chunk)
err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error {
rows, err := c.expireDecisionBatch(ctx, batch, now)
if err != nil {
return total, err
return err
}

total += rows
}
return nil
})

return total, nil
return total, err
}

// DeleteDecisions removes a list of decisions from the database
// It returns the number of impacted decisions for the CAPI/PAPI
func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
if len(decisions) < decisionDeleteBulkSize {
ids := decisionIDs(decisions)

rows, err := c.Ent.Decision.Delete().Where(
decision.IDIn(ids...),
).Exec(ctx)
if err != nil {
return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err)
}
// deleteDecisionBatch removes the decisions as a single operation.
func (c *Client) deleteDecisionBatch(ctx context.Context, batch []*ent.Decision) (int, error) {
ids := decisionIDs(batch)

return rows, nil
rows, err := c.Ent.Decision.
Delete().
Where(decision.IDIn(ids...)).
Exec(ctx)
if err != nil {
return 0, fmt.Errorf("hard delete decisions with provided filter: %w", err)
}

// big batch, let's split it and recurse
return rows, nil
}

tot := 0
// DeleteDecisions removes a list of decisions from the database,
// in multiple operations if len(decisions) > decisionDeleteBulkSize.
// It returns the number of impacted decisions for the CAPI/PAPI, even in case of error.
func (c *Client) DeleteDecisions(ctx context.Context, decisions []*ent.Decision) (int, error) {
if len(decisions) == 0 {
return 0, nil
}

for _, chunk := range slicetools.Chunks(decisions, decisionDeleteBulkSize) {
rows, err := c.DeleteDecisions(ctx, chunk)
total := 0
err := slicetools.Batch(ctx, decisions, decisionDeleteBulkSize, func(ctx context.Context, batch []*ent.Decision) error {
rows, err := c.deleteDecisionBatch(ctx, batch)
if err != nil {
return tot, err
return err
}
total += rows
return nil
})

tot += rows
}

return tot, nil
return total, err
}

// ExpireDecisionByID set the expiration of a decision to now()
Expand Down