diff --git a/.gitignore b/.gitignore index 6ee1f64..8354056 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ /deploy* /test.csv /csv/ -/build/ \ No newline at end of file +/build/ +.aider* diff --git a/README.md b/README.md index f0227ab..b1644c5 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,9 @@ More example commands: ./relayscan core update-builder-stats --start 2023-06-04 # update daily stats for 2023-06-04 until today ./relayscan core update-builder-stats --backfill # update daily stats since last entry, until today +# fetches bid adjustments to imporve builder profit estimation +./relayscan core bid-adjustments-backfill + # Start the website (--dev reloads the template on every page load, for easier iteration) ./relayscan service website --dev ``` @@ -105,8 +108,11 @@ go run . core check-payload-value # Can also check a single slot only: go run . core check-payload-value --slot _N_ +# Fetch bid adjustments +go run . core bid-adjustments-backfill --min-slot -2000 + # Reset DB -dev-postgres-wipe +make dev-postgres-wipe # See the Makefile for more commands make help diff --git a/cmd/core/bid-adjustments-backfill.go b/cmd/core/bid-adjustments-backfill.go new file mode 100644 index 0000000..cea4264 --- /dev/null +++ b/cmd/core/bid-adjustments-backfill.go @@ -0,0 +1,151 @@ +package core + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/flashbots/relayscan/common" + "github.com/flashbots/relayscan/database" + "github.com/flashbots/relayscan/vars" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var bidAdjustmentRelay string + +func init() { + bidAdjustmentsBackfillCmd.Flags().StringVar(&bidAdjustmentRelay, "relay", "relay.ultrasound.money", "relay to fetch bid adjustments from") + bidAdjustmentsBackfillCmd.Flags().Int64Var(&minSlot, "min-slot", 0, "minimum slot (if unset, backfill until the merge, negative number for that number of slots before latest)") +} + +var bidAdjustmentsBackfillCmd = &cobra.Command{ + Use: "bid-adjustments-backfill", + Short: "Backfill bid adjustments data", + Run: func(cmd *cobra.Command, args []string) { + db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) + defer db.Close() + + relay, err := common.NewRelayEntry(bidAdjustmentRelay, false) + if err != nil { + log.WithError(err).Fatal("failed to create relay entry") + } + + log.Infof("minSlot %d", minSlot) + // If needed, get latest slot (i.e. if min-slot is negative) + if minSlot < 0 { + log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot) + latestSlotOnBeaconChain := common.MustGetLatestSlot() + log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain) + minSlot = int64(latestSlotOnBeaconChain) + minSlot + } + + if minSlot != 0 { + log.Infof("Using min slot: %d", minSlot) + } + + backfiller := newBidAdjustmentsBackfiller(db, relay, uint64(minSlot)) + err = backfiller.backfillAdjustments() + if err != nil { + log.WithError(err).Fatal("failed to backfill adjustments") + } + }, +} + +type bidAdjustmentsBackfiller struct { + db *database.DatabaseService + relay common.RelayEntry + minSlot uint64 +} + +func newBidAdjustmentsBackfiller(db *database.DatabaseService, relay common.RelayEntry, minSlot uint64) *bidAdjustmentsBackfiller { + return &bidAdjustmentsBackfiller{ + db: db, + relay: relay, + minSlot: minSlot, + } +} + +func (bf *bidAdjustmentsBackfiller) backfillAdjustments() error { + _log := log.WithField("relay", bf.relay.Hostname()) + _log.Info("Backfilling adjustments...") + + baseURL := bf.relay.GetURI("/ultrasound/v1/data/adjustments") + latestSlot, err := bf.db.GetLatestAdjustmentSlot() + if err != nil { + return fmt.Errorf("failed to get latest adjustment slot: %w", err) + } + + if bf.minSlot < latestSlot { + bf.minSlot = latestSlot + } + + // Hardcoded ultrasoiund first slot with data see https://github.com/ultrasoundmoney/docs/blob/main/bid_adjustment.md#data-api + const ultrasoundFirstBidAdjustmentSlot = 7869470 + if bf.minSlot < ultrasoundFirstBidAdjustmentSlot { + bf.minSlot = ultrasoundFirstBidAdjustmentSlot + } + + const ultrasoundEndStatusCode = 403 + for slot := bf.minSlot; ; slot++ { + _log.WithField("slot", slot).Info("Fetching adjustments...") + url := fmt.Sprintf("%s?slot=%d", baseURL, slot) + + var response common.UltrasoundAdjustmentResponse + statusCode, err := common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &response) + _log.WithField("status code", statusCode).Info("Response") + if statusCode == ultrasoundEndStatusCode { + _log.WithField("Status Code", statusCode).Info("Stopping backfill due to 403") + break + } + if err != nil { + _log.WithError(err).Error("Failed to fetch adjustments") + return nil + } + + if len(response.Data) > 0 { + adjustments := make([]*database.AdjustmentEntry, len(response.Data)) + for i, adjustment := range response.Data { + submittedReceivedAt, err := time.Parse(time.RFC3339, adjustment.SubmittedReceivedAt) + if err != nil { + _log.WithError(err).Error("Failed to parse SubmittedReceivedAt") + continue + } + adjustments[i] = &database.AdjustmentEntry{ + Slot: slot, + AdjustedBlockHash: adjustment.AdjustedBlockHash, + AdjustedValue: adjustment.AdjustedValue, + BlockNumber: adjustment.BlockNumber, + BuilderPubkey: adjustment.BuilderPubkey, + Delta: adjustment.Delta, + SubmittedBlockHash: adjustment.SubmittedBlockHash, + SubmittedReceivedAt: submittedReceivedAt, + SubmittedValue: adjustment.SubmittedValue, + } + } + + err = bf.db.SaveAdjustments(adjustments) + if err != nil { + _log.WithError(err).Error("Failed to save adjustments") + } else { + for _, entry := range adjustments { + _log.WithFields(logrus.Fields{ + "Slot": entry.Slot, + "SubmittedValue": entry.SubmittedValue, + "AdjustedValue": entry.AdjustedValue, + "Delta": entry.Delta, + }).Info("Adjustment data") + } + _log.WithField("count", len(adjustments)).Info("Saved adjustments") + } + } else { + _log.Info("No adjustments found for this slot") + // break + } + + time.Sleep(time.Duration(50) * time.Microsecond) // Rate limiting + } + + return nil +} diff --git a/cmd/core/core.go b/cmd/core/core.go index 78c7c1d..3e4cfa7 100644 --- a/cmd/core/core.go +++ b/cmd/core/core.go @@ -25,4 +25,5 @@ func init() { CoreCmd.AddCommand(checkPayloadValueCmd) CoreCmd.AddCommand(backfillDataAPICmd) CoreCmd.AddCommand(updateBuilderStatsCmd) + CoreCmd.AddCommand(bidAdjustmentsBackfillCmd) } diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 5beba13..3d8a3e5 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -81,7 +81,7 @@ var backfillDataAPICmd = &cobra.Command{ backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) err = backfiller.backfillPayloadsDelivered() if err != nil { - log.WithError(err).WithField("relay", relay).Error("backfill failed") + log.WithError(err).WithField("relay", relay).Error("backfill payloads failed") } } diff --git a/common/ultrasoundbid.go b/common/ultrasoundbid.go index 7a54d7c..0c40de7 100644 --- a/common/ultrasoundbid.go +++ b/common/ultrasoundbid.go @@ -26,3 +26,18 @@ type UltrasoundStreamBid struct { FeeRecipient Address `json:"fee_recipient" ssz-size:"20"` Value U256 `json:"value" ssz-size:"32"` } + +type UltrasoundAdjustmentResponse struct { + Data []UltrasoundAdjustment `json:"data"` +} + +type UltrasoundAdjustment struct { + AdjustedBlockHash string `json:"adjusted_block_hash"` + AdjustedValue string `json:"adjusted_value"` + BlockNumber uint64 `json:"block_number"` + BuilderPubkey string `json:"builder_pubkey"` + Delta string `json:"delta"` + SubmittedBlockHash string `json:"submitted_block_hash"` + SubmittedReceivedAt string `json:"submitted_received_at"` + SubmittedValue string `json:"submitted_value"` +} diff --git a/database/database.go b/database/database.go index f7a8006..aa1b029 100644 --- a/database/database.go +++ b/database/database.go @@ -163,18 +163,36 @@ func (s *DatabaseService) GetBuilderProfits(since, until time.Time) (res []*Buil startSlot := timeToSlot(since) endSlot := timeToSlot(until) - query := `SELECT - extra_data, - count(extra_data) as blocks, - count(extra_data) filter (where coinbase_diff_eth > 0) as blocks_profit, - count(extra_data) filter (where coinbase_diff_eth < 0) as blocks_sub, - round(avg(CASE WHEN coinbase_diff_eth IS NOT NULL THEN coinbase_diff_eth ELSE 0 END), 4) as avg_profit_per_block, - round(PERCENTILE_DISC(0.5) WITHIN GROUP(ORDER BY CASE WHEN coinbase_diff_eth IS NOT NULL THEN coinbase_diff_eth ELSE 0 END), 4) as median_profit_per_block, - round(sum(CASE WHEN coinbase_diff_eth IS NOT NULL THEN coinbase_diff_eth ELSE 0 END), 4) as total_profit, - round(abs(sum(CASE WHEN coinbase_diff_eth < 0 THEN coinbase_diff_eth ELSE 0 END)), 4) as total_subsidies - FROM ( - SELECT distinct(slot), extra_data, coinbase_diff_eth FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 - ) AS x + query := `WITH + payloads as ( + SELECT + distinct(slot), extra_data, coinbase_diff_eth, value_delivered_eth, block_hash + FROM ` + vars.TableDataAPIPayloadDelivered + ` WHERE value_check_ok IS NOT NULL AND slot >= $1 AND slot <= $2 + ) + + , adjusted_payloads as ( + select + p.slot, + p.extra_data, + CASE + WHEN p.coinbase_diff_eth is null THEN 0 + WHEN a.slot IS NOT NULL THEN p.coinbase_diff_eth - p.value_delivered_eth + ELSE p.coinbase_diff_eth + END as coinbase_diff_eth + FROM payloads p + LEFT JOIN ` + vars.TableAdjustments + ` a ON p.slot=a.slot AND p.block_hash = a.adjusted_block_hash + ) + + SELECT + extra_data, + count(extra_data) as blocks, + count(extra_data) filter (where coinbase_diff_eth > 0) as blocks_profit, + count(extra_data) filter (where coinbase_diff_eth < 0) as blocks_sub, + round(avg(coinbase_diff_eth), 4) as avg_profit_per_block, + round(PERCENTILE_DISC(0.5) WITHIN GROUP(ORDER BY coinbase_diff_eth), 4) as median_profit_per_block, + round(sum(coinbase_diff_eth), 4) as total_profit, + round(abs(sum(CASE WHEN coinbase_diff_eth < 0 THEN coinbase_diff_eth ELSE 0 END)), 4) as total_subsidies + FROM adjusted_payloads GROUP BY extra_data ORDER BY total_profit DESC;` err = s.DB.Select(&res, query, startSlot, endSlot) @@ -271,3 +289,22 @@ func (s *DatabaseService) GetRecentPayloadsForExtraData(extraData []string, limi err = s.DB.Select(&resp, query, args...) return resp, err } + +func (s *DatabaseService) SaveAdjustments(entries []*AdjustmentEntry) error { + if len(entries) == 0 { + return nil + } + query := `INSERT INTO ` + vars.TableAdjustments + ` + (slot, adjusted_block_hash, adjusted_value, block_number, builder_pubkey, delta, submitted_block_hash, submitted_received_at, submitted_value) VALUES + (:slot, :adjusted_block_hash, :adjusted_value, :block_number, :builder_pubkey, :delta, :submitted_block_hash, :submitted_received_at, :submitted_value) + ON CONFLICT (slot, adjusted_block_hash) DO NOTHING` + _, err := s.DB.NamedExec(query, entries) + return err +} + +func (s *DatabaseService) GetLatestAdjustmentSlot() (uint64, error) { + var slot uint64 + query := `SELECT COALESCE(MAX(slot), 0) FROM ` + vars.TableAdjustments + err := s.DB.Get(&slot, query) + return slot, err +} diff --git a/database/migrations/005_create_adjustments_table.go b/database/migrations/005_create_adjustments_table.go new file mode 100644 index 0000000..8403d8a --- /dev/null +++ b/database/migrations/005_create_adjustments_table.go @@ -0,0 +1,29 @@ +package migrations + +import ( + "github.com/flashbots/relayscan/database/vars" + migrate "github.com/rubenv/sql-migrate" +) + +var migration005SQL = `CREATE TABLE IF NOT EXISTS ` + vars.TableAdjustments + ` ( + id SERIAL PRIMARY KEY, + inserted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + slot BIGINT NOT NULL, + adjusted_block_hash TEXT NOT NULL, + adjusted_value TEXT NOT NULL, + block_number BIGINT NOT NULL, + builder_pubkey TEXT NOT NULL, + delta TEXT NOT NULL, + submitted_block_hash TEXT NOT NULL, + submitted_received_at TIMESTAMP WITH TIME ZONE NOT NULL, + submitted_value TEXT NOT NULL, + UNIQUE(slot, adjusted_block_hash) + );` + +var Migration005CreateAdjustmentsTable = &migrate.Migration{ + Id: "005-create-adjustments-table", + Up: []string{migration005SQL}, + + DisableTransactionUp: false, + DisableTransactionDown: true, +} diff --git a/database/migrations/006_add_slot_block_index_to_bid_adjustments.go b/database/migrations/006_add_slot_block_index_to_bid_adjustments.go new file mode 100644 index 0000000..866c97c --- /dev/null +++ b/database/migrations/006_add_slot_block_index_to_bid_adjustments.go @@ -0,0 +1,16 @@ +package migrations + +import ( + "github.com/flashbots/relayscan/database/vars" + migrate "github.com/rubenv/sql-migrate" +) + +var migration006SQL = `CREATE INDEX IF NOT EXISTS idx_` + vars.TableAdjustments + `_slot_block ON ` + vars.TableAdjustments + ` (slot, adjusted_block_hash);` + +var Migration006AddSlotBlockIndexToAdjustments = &migrate.Migration{ + Id: "006-add-slot-block-index-to-adjustments", + Up: []string{migration006SQL}, + + DisableTransactionUp: false, + DisableTransactionDown: true, +} diff --git a/database/migrations/migration.go b/database/migrations/migration.go index 9784ac8..4a3b35d 100644 --- a/database/migrations/migration.go +++ b/database/migrations/migration.go @@ -11,5 +11,7 @@ var Migrations = migrate.MemoryMigrationSource{ Migration002AddBlobCount, Migration003AddBlobIndexes, Migration004AddBlockTimestamp, + Migration005CreateAdjustmentsTable, + Migration006AddSlotBlockIndexToAdjustments, }, } diff --git a/database/types.go b/database/types.go index a00ba9a..3837d15 100644 --- a/database/types.go +++ b/database/types.go @@ -195,3 +195,17 @@ type TmpPayloadsForExtraDataEntry struct { InsertedAt time.Time `db:"inserted_at"` BlockTimestamp sql.NullTime `db:"block_timestamp"` } + +type AdjustmentEntry struct { + ID int64 `db:"id"` + InsertedAt time.Time `db:"inserted_at"` + Slot uint64 `db:"slot"` + AdjustedBlockHash string `db:"adjusted_block_hash"` + AdjustedValue string `db:"adjusted_value"` + BlockNumber uint64 `db:"block_number"` + BuilderPubkey string `db:"builder_pubkey"` + Delta string `db:"delta"` + SubmittedBlockHash string `db:"submitted_block_hash"` + SubmittedReceivedAt time.Time `db:"submitted_received_at"` + SubmittedValue string `db:"submitted_value"` +} diff --git a/database/vars/tables.go b/database/vars/tables.go index f0b419c..1dde02e 100644 --- a/database/vars/tables.go +++ b/database/vars/tables.go @@ -15,4 +15,5 @@ var ( TableError = tableBase + "_error" TableBlockBuilder = tableBase + "_blockbuilder" TableBlockBuilderInclusionStats = tableBase + "_blockbuilder_stats_inclusion" + TableAdjustments = tableBase + "_adjustments" ) diff --git a/scripts/backfill_adjustments.sh b/scripts/backfill_adjustments.sh new file mode 100644 index 0000000..b8d778b --- /dev/null +++ b/scripts/backfill_adjustments.sh @@ -0,0 +1,7 @@ +#!/bin/bash +set -e +dir=$( dirname -- "$0"; ) +cd $dir +cd .. +source .env.prod +./relayscan core bid-adjustments-backfill 2>&1 | /usr/bin/tee -a /var/log/relayscan.log