Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Make reaping batch sizes configurable via --history-retention-reap-count. #5272

Merged
merged 10 commits into from
Apr 17, 2024
6 changes: 5 additions & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,11 @@ func (a *App) init() error {
initSubmissionSystem(a)

// reaper
a.reaper = reap.New(a.config.HistoryRetentionCount, a.HorizonSession(), a.ledgerState)
a.reaper = reap.New(
a.config.HistoryRetentionCount,
a.config.HistoryRetentionReapCount,
a.HorizonSession(),
a.ledgerState)

// go metrics
initGoMetrics(a)
Expand Down
6 changes: 6 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ type Config struct {
// determining a "retention duration", each ledger roughly corresponds to 10
// seconds of real time.
HistoryRetentionCount uint
// HistoryRetentionReapCount is the number of ledgers worth of history data
// to remove per second from the Horizon database. It is intended to allow
// control over the amount of CPU and database load caused by reaping,
// especially if enabling reaping for the first time or in times of
// increased ledger load.
HistoryRetentionReapCount uint
// StaleThreshold represents the number of ledgers a history database may be
// out-of-date by before horizon begins to respond with an error to history
// requests.
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestReapLookupTables(t *testing.T) {

db := tt.HorizonSession()

sys := reap.New(0, db, ledgerState)
sys := reap.New(0, 0, db, ledgerState)

var (
prevLedgers, curLedgers int
Expand Down
16 changes: 15 additions & 1 deletion services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,23 @@ func Flags() (*Config, support.ConfigOptions) {
ConfigKey: &config.HistoryRetentionCount,
OptType: types.Uint,
FlagDefault: uint(0),
Usage: "the minimum number of ledgers to maintain within horizon's history tables. 0 signifies an unlimited number of ledgers will be retained",
Usage: "the minimum number of ledgers to maintain within Horizon's history tables (0 = retain an unlimited number of ledgers)",
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: "history-retention-reap-count",
ConfigKey: &config.HistoryRetentionReapCount,
OptType: types.Uint,
FlagDefault: uint(50_000),
Usage: "the batch size (in ledgers) to remove per reap from the Horizon database",
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetUint(opt.Name); val <= 0 || val > 500_000 {
return fmt.Errorf("flag --history-retention-reap-count must be in range [1, 500,000]")
}
return nil
},
},
&support.ConfigOption{
Name: "history-stale-threshold",
ConfigKey: &config.StaleThreshold,
Expand Down
11 changes: 7 additions & 4 deletions services/horizon/internal/reap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ import (
type System struct {
HistoryQ *history.Q
RetentionCount uint
ledgerState *ledger.State
ctx context.Context
cancel context.CancelFunc
RetentionBatch uint

ledgerState *ledger.State
ctx context.Context
cancel context.CancelFunc
}

// New initializes the reaper, causing it to begin polling the stellar-core
// database for now ledgers and ingesting data into the horizon database.
func New(retention uint, dbSession db.SessionInterface, ledgerState *ledger.State) *System {
func New(retention, retentionBatchSize uint, dbSession db.SessionInterface, ledgerState *ledger.State) *System {
ctx, cancel := context.WithCancel(context.Background())

r := &System{
HistoryQ: &history.Q{dbSession.Clone()},
RetentionCount: retention,
RetentionBatch: retentionBatchSize,
ledgerState: ledgerState,
ctx: ctx,
cancel: cancel,
Expand Down
25 changes: 16 additions & 9 deletions services/horizon/internal/reap/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reap

import (
"context"
"fmt"
"time"

herrors "github.com/stellar/go/services/horizon/internal/errors"
Expand Down Expand Up @@ -70,25 +71,31 @@ func (r *System) runOnce(ctx context.Context) {
}
}

// Work backwards in 100k ledger blocks to prevent using all the CPU.
// Work backwards in 50k (by default, otherwise configurable via the CLI) ledger
// blocks to prevent using all the CPU.
//
// This runs every hour, so we need to make sure it doesn't
// run for longer than an hour.
// This runs every hour, so we need to make sure it doesn't run for longer than
// an hour.
//
// Current ledger at 2021-08-12 is 36,827,497, so 100k means 368 batches. At 1
// batch/second, that seems like a reasonable balance between running well
// under an hour, and slowing it down enough to leave some CPU for other
// processes.
var batchSize = int32(100_000)
// Current ledger at 2024-04-04s is 51,092,283, so 50k means 1021 batches. At 1
// batch/second, that seems like a reasonable balance between running under an
// hour, and slowing it down enough to leave some CPU for other processes.
var sleep = 1 * time.Second

func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error {
batchSize := int32(r.RetentionBatch)
if batchSize <= 0 {
return fmt.Errorf("invalid batch size for reaping (%d)", batchSize)
}

for batchEndSeq := endSeq - 1; batchEndSeq >= startSeq; batchEndSeq -= batchSize {
batchStartSeq := batchEndSeq - batchSize
if batchStartSeq < startSeq {
batchStartSeq = startSeq
}
log.WithField("start_ledger", batchStartSeq).WithField("end_ledger", batchEndSeq).Info("reaper: clearing")
log.WithField("start_ledger", batchStartSeq).
WithField("end_ledger", batchEndSeq).
Info("reaper: clearing")

batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/reap/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestDeleteUnretainedHistory(t *testing.T) {

db := tt.HorizonSession()

sys := New(0, db, ledgerState)
sys := New(0, 50_000, db, ledgerState)

// Disable sleeps for this.
sleep = 0
Expand Down
Loading