diff --git a/services/horizon/db.go b/services/horizon/db.go index 6f2f726b9d..8cfb265a29 100644 --- a/services/horizon/db.go +++ b/services/horizon/db.go @@ -28,7 +28,7 @@ var dbBackfillCmd = &cobra.Command{ initConfig() hlog.DefaultLogger.Logger.Level = config.LogLevel - i := ingestSystem() + i := ingestSystem(ingest.Config{}) i.SkipCursorUpdate = true parsed, err := strconv.ParseUint(args[0], 10, 32) if err != nil { @@ -49,7 +49,7 @@ var dbClearCmd = &cobra.Command{ initConfig() hlog.DefaultLogger.Logger.Level = config.LogLevel - i := ingestSystem() + i := ingestSystem(ingest.Config{}) err := i.ClearAll() if err != nil { hlog.Error(err) @@ -138,7 +138,7 @@ var dbRebaseCmd = &cobra.Command{ initConfig() hlog.DefaultLogger.Logger.Level = config.LogLevel - i := ingestSystem() + i := ingestSystem(ingest.Config{}) i.SkipCursorUpdate = true err := i.RebaseHistory() @@ -156,7 +156,7 @@ var dbReingestCmd = &cobra.Command{ initConfig() hlog.DefaultLogger.Logger.Level = config.LogLevel - i := ingestSystem() + i := ingestSystem(ingest.Config{}) i.SkipCursorUpdate = true logStatus := func(stage string) { count := i.Metrics.IngestLedgerTimer.Count() @@ -207,7 +207,7 @@ func init() { dbCmd.AddCommand(dbRebaseCmd) } -func ingestSystem() *ingest.System { +func ingestSystem(ingestConfig ingest.Config) *ingest.System { hdb, err := db.Open("postgres", config.DatabaseURL) if err != nil { log.Fatal(err) @@ -223,7 +223,7 @@ func ingestSystem() *ingest.System { log.Fatal("network-passphrase is blank: reingestion requires manually setting passphrase") } - i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb) + i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb, ingestConfig) return i } diff --git a/services/horizon/internal/actions_assets_test.go b/services/horizon/internal/actions_assets_test.go index 2da813799c..244ecbb84c 100644 --- a/services/horizon/internal/actions_assets_test.go +++ b/services/horizon/internal/actions_assets_test.go @@ -3,8 +3,9 @@ package horizon import ( "testing" - "github.com/stellar/go/protocols/horizon/base" "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/protocols/horizon/base" + "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/support/render/hal" ) @@ -271,3 +272,20 @@ func TestInvalidAssetIssuer(t *testing.T) { w = ht.Get("/assets?asset_issuer=invalid") ht.Assert.Equal(400, w.Code) } + +func TestAssetStatsDisabled(t *testing.T) { + ht := StartHTTPTest(t, "ingest_asset_stats") + defer ht.Finish() + + // Ugly but saves us time needed to change each `StartHTTPTest` occurence. + appConfig := NewTestConfig() + appConfig.DisableAssetStats = true + + var err error + ht.App, err = NewApp(appConfig) + ht.Assert.Nil(err) + ht.RH = test.NewRequestHelper(ht.App.web.router) + + w := ht.Get("/assets?asset_issuer=GC23QF2HUE52AMXUFUH3AYJAXXGXXV2VHXYYR6EYXETPKDXZSAW67XO4") + ht.Assert.Equal(404, w.Code) +} diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 1a49747ab7..0a1f069136 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -33,13 +33,16 @@ type Config struct { // determining a "retention duration", each ledger roughly corresponds to 10 // seconds of real time. HistoryRetentionCount uint - // StaleThreshold represents the number of ledgers a history database may be // out-of-date by before horizon begins to respond with an error to history // requests. StaleThreshold uint - // SkipCursorUpdate causes the ingestor to skip reporting the "last imported // ledger" state to stellar-core. SkipCursorUpdate bool + // DisableAssetStats is a feature flag that determines whether to calculate + // asset stats during the ingestion and expose `/assets` endpoint. + // Disabling it will save CPU when ingesting ledgers full of many different + // assets related operations. + DisableAssetStats bool } diff --git a/services/horizon/internal/ingest/ingestion_test.go b/services/horizon/internal/ingest/ingestion_test.go index 912295aaf7..a8fbfb03bd 100644 --- a/services/horizon/internal/ingest/ingestion_test.go +++ b/services/horizon/internal/ingest/ingestion_test.go @@ -55,7 +55,7 @@ func TestAssetIngest(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("kahuna") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) q := history.Q{Session: s.Ingestion.DB} @@ -75,7 +75,7 @@ func TestAssetIngest(t *testing.T) { func TestAssetStatsIngest(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) q := history.Q{Session: s.Ingestion.DB} @@ -139,12 +139,49 @@ func TestAssetStatsIngest(t *testing.T) { }, assetStats[2]) } +func TestAssetStatsDisabledIngest(t *testing.T) { + tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats") + defer tt.Finish() + s := ingest(tt, true) + tt.Require.NoError(s.Err) + q := history.Q{Session: s.Ingestion.DB} + + type AssetStatResult struct { + Type string `db:"asset_type"` + Code string `db:"asset_code"` + Issuer string `db:"asset_issuer"` + Amount int64 `db:"amount"` + NumAccounts int32 `db:"num_accounts"` + Flags int8 `db:"flags"` + Toml string `db:"toml"` + } + assetStats := []AssetStatResult{} + err := q.Select( + &assetStats, + sq. + Select( + "hist.asset_type", + "hist.asset_code", + "hist.asset_issuer", + "stats.amount", + "stats.num_accounts", + "stats.flags", + "stats.toml", + ). + From("history_assets hist"). + Join("asset_stats stats ON hist.id = stats.id"). + OrderBy("hist.asset_code ASC", "hist.asset_issuer ASC"), + ) + tt.Require.NoError(err) + tt.Assert.Equal(0, len(assetStats)) +} + func TestTradeIngestTimestamp(t *testing.T) { //ingest trade scenario and verify that the trade timestamp //matches the appropriate ledger's timestamp tt := test.Start(t).ScenarioWithoutHorizon("trades") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) q := history.Q{Session: s.Ingestion.DB} var ledgers []history.Ledger diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 03cffd4999..64bb4adbd7 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -66,6 +66,13 @@ type Cursor struct { data *LedgerBundle } +// Config allows passing some configuration values to System and Session. +type Config struct { + // DisableAssetStats is a feature flag that determines whether to calculate + // asset stats in this ingestion system. + DisableAssetStats bool +} + // EffectIngestion is a helper struct to smooth the ingestion of effects. this // struct will track what the correct operation to use and order to use when // adding effects into an ingestion. @@ -88,27 +95,23 @@ type LedgerBundle struct { // System represents the data ingestion subsystem of horizon. type System struct { + // Config allows passing some configuration values to System. + Config Config // HorizonDB is the connection to the horizon database that ingested data will // be written to. HorizonDB *db.Session - // CoreDB is the stellar-core db that data is ingested from. - CoreDB *db.Session - + CoreDB *db.Session Metrics IngesterMetrics - // Network is the passphrase for the network being imported Network string - // StellarCoreURL is the http endpoint of the stellar-core that data is being // ingested from. StellarCoreURL string - // SkipCursorUpdate causes the ingestor to skip // reporting the "last imported ledger" cursor to // stellar-core SkipCursorUpdate bool - // HistoryRetentionCount is the desired minimum number of ledgers to // keep in the history database, working backwards from the latest core // ledger. 0 represents "all ledgers". @@ -150,24 +153,22 @@ type Ingestion struct { // Session represents a single attempt at ingesting data into the history // database. type Session struct { + // Config allows passing some configuration values to System. + Config Config Cursor *Cursor Ingestion *Ingestion // Network is the passphrase for the network being imported Network string - // StellarCoreURL is the http endpoint of the stellar-core that data is being // ingested from. StellarCoreURL string - // ClearExisting causes the session to clear existing data from the horizon db // when the session is run. ClearExisting bool - // SkipCursorUpdate causes the session to skip // reporting the "last imported ledger" cursor to // stellar-core SkipCursorUpdate bool - // Metrics is a reference to where the session should record its metric information Metrics *IngesterMetrics @@ -177,7 +178,6 @@ type Session struct { // Err is the error that caused this session to fail, if any. Err error - // Ingested is the number of ledgers that were successfully ingested during // this session. Ingested int @@ -185,8 +185,9 @@ type Session struct { // New initializes the ingester, causing it to begin polling the stellar-core // database for now ledgers and ingesting data into the horizon database. -func New(network string, coreURL string, core, horizon *db.Session) *System { +func New(network string, coreURL string, core, horizon *db.Session, config Config) *System { i := &System{ + Config: config, Network: network, StellarCoreURL: coreURL, HorizonDB: horizon, @@ -215,6 +216,7 @@ func NewSession(i *System) *Session { hdb := i.HorizonDB.Clone() return &Session{ + Config: i.Config, Ingestion: &Ingestion{ DB: hdb, }, diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 63c34f3efa..c8d62be5a1 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -13,7 +13,7 @@ func TestIngest_Kahuna1(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("kahuna") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) tt.Assert.Equal(62, s.Ingested) @@ -34,7 +34,7 @@ func TestIngest_Kahuna2(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("kahuna-2") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) tt.Assert.Equal(6, s.Ingested) @@ -52,7 +52,7 @@ func TestIngest_Kahuna2(t *testing.T) { func TestTick(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("base") defer tt.Finish() - sys := sys(tt) + sys := sys(tt, false) // ingest by tick s := sys.Tick() @@ -65,8 +65,8 @@ func TestTick(t *testing.T) { tt.Require.NoError(s.Err) } -func ingest(tt *test.T) *Session { - sys := sys(tt) +func ingest(tt *test.T, disableAssetStats bool) *Session { + sys := sys(tt, disableAssetStats) s := NewSession(sys) s.Cursor = NewCursor(1, ledger.CurrentState().CoreLatest, sys) s.Run() @@ -74,11 +74,12 @@ func ingest(tt *test.T) *Session { return s } -func sys(tt *test.T) *System { +func sys(tt *test.T, disableAssetStats bool) *System { return New( network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession(), + Config{DisableAssetStats: disableAssetStats}, ) } diff --git a/services/horizon/internal/ingest/session.go b/services/horizon/internal/ingest/session.go index 952fb6943c..93e9ccf402 100644 --- a/services/horizon/internal/ingest/session.go +++ b/services/horizon/internal/ingest/session.go @@ -46,7 +46,10 @@ func (is *Session) Run() { break } } - is.Cursor.AssetsModified.UpdateAssetStats(is) + + if !is.Config.DisableAssetStats { + is.Cursor.AssetsModified.UpdateAssetStats(is) + } if is.Err != nil { is.Ingestion.Rollback() diff --git a/services/horizon/internal/ingest/session_test.go b/services/horizon/internal/ingest/session_test.go index 192a89fd42..5684d9a668 100644 --- a/services/horizon/internal/ingest/session_test.go +++ b/services/horizon/internal/ingest/session_test.go @@ -13,7 +13,7 @@ func Test_ingestSignerEffects(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("set_options") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) q := &history.Q{Session: tt.HorizonSession()} @@ -33,7 +33,7 @@ func Test_ingestOperationEffects(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("set_options") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) q := &history.Q{Session: tt.HorizonSession()} @@ -49,7 +49,7 @@ func Test_ingestOperationEffects(t *testing.T) { // HACK(scott): switch to kahuna recipe mid-stream. We need to integrate our test scenario loader to be compatible with go subtests/ tt.ScenarioWithoutHorizon("kahuna") - s = ingest(tt) + s = ingest(tt, false) tt.Require.NoError(s.Err) pq, err := db2.NewPageQuery("", "asc", 200) tt.Require.NoError(err) @@ -80,7 +80,7 @@ func Test_ingestBumpSeq(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("kahuna") defer tt.Finish() - s := ingest(tt) + s := ingest(tt, false) tt.Require.NoError(s.Err) q := &history.Q{Session: tt.HorizonSession()} diff --git a/services/horizon/internal/ingest/system_test.go b/services/horizon/internal/ingest/system_test.go index e4db198a17..96765621ba 100644 --- a/services/horizon/internal/ingest/system_test.go +++ b/services/horizon/internal/ingest/system_test.go @@ -10,7 +10,7 @@ import ( func TestBackfill(t *testing.T) { tt := test.Start(t).ScenarioWithoutHorizon("kahuna") defer tt.Finish() - is := sys(tt) + is := sys(tt, false) err := is.ReingestSingle(10) tt.Require.NoError(err) @@ -34,7 +34,7 @@ func TestBackfill(t *testing.T) { func TestClearAll(t *testing.T) { tt := test.Start(t).Scenario("kahuna") defer tt.Finish() - is := sys(tt) + is := sys(tt, false) err := is.ClearAll() @@ -51,7 +51,7 @@ func TestValidation(t *testing.T) { tt := test.Start(t).Scenario("kahuna") defer tt.Finish() - sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession()) + sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession(), Config{}) // intact chain for i := int32(2); i <= 57; i++ { diff --git a/services/horizon/internal/init_ingester.go b/services/horizon/internal/init_ingester.go index 4ad16d8137..75ccf6bbb0 100644 --- a/services/horizon/internal/init_ingester.go +++ b/services/horizon/internal/init_ingester.go @@ -20,6 +20,9 @@ func initIngester(app *App) { app.config.StellarCoreURL, app.CoreSession(nil), app.HorizonSession(nil), + ingest.Config{ + DisableAssetStats: app.config.DisableAssetStats, + }, ) app.ingester.SkipCursorUpdate = app.config.SkipCursorUpdate diff --git a/services/horizon/internal/init_web.go b/services/horizon/internal/init_web.go index 6fdce8115f..0eea741859 100644 --- a/services/horizon/internal/init_web.go +++ b/services/horizon/internal/init_web.go @@ -143,8 +143,11 @@ func initWebActions(app *App) { r.Post("/transactions", TransactionCreateAction{}.Handle) r.Get("/paths", PathIndexAction{}.Handle) - // Asset related endpoints - r.Get("/assets", AssetsAction{}.Handle) + if !app.config.DisableAssetStats { + // Asset related endpoints + r.Get("/assets", AssetsAction{}.Handle) + } + // friendbot if app.config.FriendbotURL != nil { redirectFriendbot := func(w http.ResponseWriter, r *http.Request) { diff --git a/services/horizon/main.go b/services/horizon/main.go index fdbc6e4adc..4297ed9cde 100644 --- a/services/horizon/main.go +++ b/services/horizon/main.go @@ -44,6 +44,7 @@ func init() { viper.BindEnv("history-retention-count", "HISTORY_RETENTION_COUNT") viper.BindEnv("history-stale-threshold", "HISTORY_STALE_THRESHOLD") viper.BindEnv("skip-cursor-update", "SKIP_CURSOR_UPDATE") + viper.BindEnv("disable-asset-stats", "DISABLE_ASSET_STATS") rootCmd = &cobra.Command{ Use: "horizon", @@ -230,5 +231,6 @@ func initConfig() { HistoryRetentionCount: uint(viper.GetInt("history-retention-count")), StaleThreshold: uint(viper.GetInt("history-stale-threshold")), SkipCursorUpdate: viper.GetBool("skip-cursor-update"), + DisableAssetStats: viper.GetBool("disable-asset-stats"), } }