Skip to content

Commit

Permalink
services/horizon/internal/ingest: Add more fine-grained reap metrics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Jul 16, 2024
1 parent 8e86d00 commit 0e83215
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 50 deletions.
38 changes: 21 additions & 17 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error)
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -971,27 +971,27 @@ type tableObjectFieldPair struct {
objectField string
}

type LookupTableReapResult struct {
Offset int64
RowsDeleted int64
Duration time.Duration
}

// ReapLookupTables removes rows from lookup tables like history_claimable_balances
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
map[string]int64, // deleted rows count
map[string]int64, // new offsets
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, nil, errors.New("cannot be called outside of an ingestion transaction")
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 1000

deletedCount := make(map[string]int64)

if offsets == nil {
offsets = make(map[string]int64)
}

results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
Expand Down Expand Up @@ -1054,9 +1054,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
},
},
} {
startTime := time.Now()
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, nil, errors.Wrap(err, "error constructing a query")
return nil, errors.Wrap(err, "error constructing a query")
}

// Find new offset before removing the rows
Expand All @@ -1066,7 +1067,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
if q.NoRows(err) {
newOffset = 0
} else {
return nil, nil, err
return nil, err
}
}

Expand All @@ -1075,18 +1076,21 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
query,
)
if err != nil {
return nil, nil, errors.Wrapf(err, "error running query: %s", query)
return nil, errors.Wrapf(err, "error running query: %s", query)
}

rows, err := res.RowsAffected()
if err != nil {
return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
}

deletedCount[table] = rows
offsets[table] = newOffset
results[table] = LookupTableReapResult{
Offset: newOffset,
RowsDeleted: rows,
Duration: time.Since(startTime),
}
}
return deletedCount, offsets, nil
return results, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
Expand Down
20 changes: 10 additions & 10 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
results, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
Expand All @@ -77,23 +77,23 @@ func TestReapLookupTables(t *testing.T) {

tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`)
tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)

tt.Assert.Equal(7, prevAssets, "prevAssets")
tt.Assert.Equal(0, curAssets, "curAssets")
tt.Assert.Equal(int64(7), deletedCount["history_assets"], `deletedCount["history_assets"]`)
tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)

tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`)
tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")
tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`)
tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(newOffsets, 4)
tt.Assert.Equal(int64(0), newOffsets["history_accounts"])
tt.Assert.Equal(int64(0), newOffsets["history_assets"])
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
tt.Assert.Len(results, 4)
tt.Assert.Equal(int64(0), results["history_accounts"].Offset)
tt.Assert.Equal(int64(0), results["history_assets"].Offset)
tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset)
tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset)
}
41 changes: 25 additions & 16 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ type Metrics struct {
// duration of rebuilding trade aggregation buckets.
LedgerIngestionTradeAggregationDuration prometheus.Summary

// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
// duration of reaping lookup tables.
LedgerIngestionReapLookupTablesDuration prometheus.Summary
ReapDurationByLookupTable *prometheus.SummaryVec
RowsReapedByLookupTable *prometheus.SummaryVec

// StateVerifyDuration exposes timing metrics about the rate and
// duration of state verification.
Expand Down Expand Up @@ -228,7 +227,7 @@ type system struct {

runStateVerificationOnLedger func(uint32) bool

reapOffsets map[string]int64
reapOffsetByTable map[string]int64
maxLedgerPerFlush uint32

reaper *Reaper
Expand Down Expand Up @@ -327,6 +326,7 @@ func NewSystem(config Config) (System, error) {
config.ReapConfig,
config.HistorySession,
),
reapOffsetByTable: map[string]int64{},
}

system.initMetrics()
Expand Down Expand Up @@ -367,11 +367,17 @@ func (s *system) initMetrics() {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})

s.metrics.LedgerIngestionReapLookupTablesDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "ledger_ingestion_reap_lookup_tables_duration_seconds",
Help: "ledger ingestion reap lookup tables durations, sliding window = 10m",
s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds",
Help: "reap lookup tables durations, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
}, []string{"table"})

s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped",
Help: "rows deleted during lookup tables reap, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table"})

s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds",
Expand Down Expand Up @@ -490,7 +496,8 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LocalLatestLedger)
registry.MustRegister(s.metrics.LedgerIngestionDuration)
registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration)
registry.MustRegister(s.metrics.LedgerIngestionReapLookupTablesDuration)
registry.MustRegister(s.metrics.ReapDurationByLookupTable)
registry.MustRegister(s.metrics.RowsReapedByLookupTable)
registry.MustRegister(s.metrics.StateVerifyDuration)
registry.MustRegister(s.metrics.StateInvalidGauge)
registry.MustRegister(s.metrics.LedgerStatsCounter)
Expand Down Expand Up @@ -793,7 +800,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable)
if err != nil {
log.WithError(err).Warn("Error reaping lookup tables")
return
Expand All @@ -807,18 +814,20 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {

totalDeleted := int64(0)
reapLog := log
for table, c := range deletedCount {
totalDeleted += c
reapLog = reapLog.WithField(table, c)
for table, result := range results {
totalDeleted += result.RowsDeleted
reapLog = reapLog.WithField(table, result)
s.reapOffsetByTable[table] = result.Offset
s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds())
}

if totalDeleted > 0 {
reapLog.Info("Reaper deleted rows from lookup tables")
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(float64(totalDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(time.Since(reapStart).Seconds())
}

func (s *system) incrementStateVerificationErrors() int {
Expand Down
11 changes: 4 additions & 7 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,16 +562,13 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) {
func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]history.LookupTableReapResult, error) {
args := m.Called(ctx, offsets)
var r1, r2 map[string]int64
var r1 map[string]history.LookupTableReapResult
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]int64)
r1 = args.Get(0).(map[string]history.LookupTableReapResult)
}
if args.Get(1) != nil {
r1 = args.Get(1).(map[string]int64)
}
return r1, r2, args.Error(2)
return r1, args.Error(2)
}

func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {
Expand Down

0 comments on commit 0e83215

Please sign in to comment.