Skip to content

Commit 46fc1de

Browse files
authored
Merge branch 'master' into friendbot-instrumentation-03
2 parents 77cfaf4 + fd25f1d commit 46fc1de

File tree

4 files changed

+90
-69
lines changed

4 files changed

+90
-69
lines changed

.github/workflows/horizon.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
os: [ubuntu-22.04]
1414
go: ["1.22", "1.23"]
1515
pg: [14, 16]
16-
protocol-version: [22,23]
16+
protocol-version: [23]
1717
runs-on: ${{ matrix.os }}
1818
services:
1919
postgres:
@@ -32,9 +32,6 @@ jobs:
3232
env:
3333
HORIZON_INTEGRATION_TESTS_ENABLED: true
3434
HORIZON_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }}
35-
PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 23.0.0-2634.d5cbc0793.focal
36-
PROTOCOL_22_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:23.0.0-2634.d5cbc0793.focal
37-
PROTOCOL_22_STELLAR_RPC_DOCKER_IMG: stellar/stellar-rpc:23.0.1-132
3835
PROTOCOL_23_CORE_DEBIAN_PKG_VERSION: 23.0.0-2634.d5cbc0793.focal
3936
PROTOCOL_23_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:23.0.0-2634.d5cbc0793.focal
4037
PROTOCOL_23_STELLAR_RPC_DOCKER_IMG: stellar/stellar-rpc:23.0.1-132

services/horizon/cmd/db.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,21 +211,23 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
211211
}
212212
defer system.Shutdown()
213213

214-
err = system.ReingestRange(ledgerRanges, reingestForce, true)
215-
if err != nil {
216-
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
217-
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
214+
return runWithMetrics(config.AdminPort, system, func() error {
215+
err = system.ReingestRange(ledgerRanges, reingestForce, true)
216+
if err != nil {
217+
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
218+
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
218219
It is not possible to run the reingest command on this range in parallel with
219220
Horizon's ingestion system.
220221
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
221222
or, use the force flag to ensure that Horizon's ingestion system is blocked until
222223
the reingest command completes.`)
223-
}
224+
}
224225

225-
return err
226-
}
227-
hlog.Info("Range run successfully!")
228-
return nil
226+
return err
227+
}
228+
hlog.Info("Range run successfully!")
229+
return nil
230+
})
229231
}
230232

231233
func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) {

services/horizon/cmd/ingest.go

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ import (
88
_ "net/http/pprof"
99
"time"
1010

11+
"github.com/go-chi/chi"
12+
chimiddleware "github.com/go-chi/chi/middleware"
13+
"github.com/prometheus/client_golang/prometheus"
1114
"github.com/spf13/cobra"
1215
"github.com/spf13/viper"
1316

1417
"github.com/stellar/go/historyarchive"
1518
horizon "github.com/stellar/go/services/horizon/internal"
1619
"github.com/stellar/go/services/horizon/internal/db2/history"
20+
"github.com/stellar/go/services/horizon/internal/httpx"
1721
"github.com/stellar/go/services/horizon/internal/ingest"
1822
"github.com/stellar/go/support/config"
1923
support "github.com/stellar/go/support/config"
@@ -23,7 +27,7 @@ import (
2327

2428
var ingestBuildStateSequence uint32
2529
var ingestBuildStateSkipChecks bool
26-
var ingestVerifyFrom, ingestVerifyTo, ingestVerifyDebugServerPort uint32
30+
var ingestVerifyFrom, ingestVerifyTo uint32
2731
var ingestVerifyState bool
2832
var ingestVerifyStorageBackendConfigPath string
2933
var ingestVerifyLedgerBackendType ingest.LedgerBackendType
@@ -73,14 +77,6 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{
7377
FlagDefault: false,
7478
Usage: "[optional] verifies state at the last ledger of the range when true",
7579
},
76-
{
77-
Name: "debug-server-port",
78-
ConfigKey: &ingestVerifyDebugServerPort,
79-
OptType: types.Uint32,
80-
Required: false,
81-
FlagDefault: uint32(0),
82-
Usage: "[optional] opens a net/http/pprof server at given port",
83-
},
8480
generateLedgerBackendOpt(&ingestVerifyLedgerBackendType),
8581
generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath),
8682
}
@@ -159,19 +155,6 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
159155
return err
160156
}
161157

162-
if ingestVerifyDebugServerPort != 0 {
163-
go func() {
164-
log.Infof("Starting debug server at: %d", ingestVerifyDebugServerPort)
165-
err := http.ListenAndServe(
166-
fmt.Sprintf("localhost:%d", ingestVerifyDebugServerPort),
167-
nil,
168-
)
169-
if err != nil {
170-
log.Error(err)
171-
}
172-
}()
173-
}
174-
175158
mngr := historyarchive.NewCheckpointManager(horizonConfig.CheckpointFrequency)
176159
if !mngr.IsCheckpoint(ingestVerifyFrom) && ingestVerifyFrom != 1 {
177160
return fmt.Errorf("`--from` must be a checkpoint ledger")
@@ -249,17 +232,12 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
249232
if err != nil {
250233
return err
251234
}
252-
253-
err = system.StressTest(
254-
stressTestNumTransactions,
255-
stressTestChangesPerTransaction,
256-
)
257-
if err != nil {
258-
return err
259-
}
260-
261-
log.Info("Stress test completed successfully!")
262-
return nil
235+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
236+
return system.StressTest(
237+
stressTestNumTransactions,
238+
stressTestChangesPerTransaction,
239+
)
240+
})
263241
},
264242
}
265243

@@ -365,16 +343,12 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
365343
return err
366344
}
367345

368-
err = system.BuildState(
369-
ingestBuildStateSequence,
370-
ingestBuildStateSkipChecks,
371-
)
372-
if err != nil {
373-
return err
374-
}
375-
376-
log.Info("State built successfully!")
377-
return nil
346+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
347+
return system.BuildState(
348+
ingestBuildStateSequence,
349+
ingestBuildStateSkipChecks,
350+
)
351+
})
378352
},
379353
}
380354

@@ -444,11 +418,13 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
444418
return err
445419
}
446420

447-
return system.LoadTest(
448-
ingestionLoadTestLedgersPath,
449-
ingestionLoadTestCloseDuration,
450-
ingestionLoadTestFixturesPath,
451-
)
421+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
422+
return system.LoadTest(
423+
ingestionLoadTestLedgersPath,
424+
ingestionLoadTestCloseDuration,
425+
ingestionLoadTestFixturesPath,
426+
)
427+
})
452428
},
453429
}
454430

@@ -496,6 +472,46 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
496472
)
497473
}
498474

475+
func runWithMetrics(metricsPort uint, system ingest.System, f func() error) error {
476+
if metricsPort != 0 {
477+
log.Infof("Starting metrics server at: %d", metricsPort)
478+
mux := chi.NewMux()
479+
mux.Use(chimiddleware.StripSlashes)
480+
mux.Use(chimiddleware.RequestID)
481+
mux.Use(chimiddleware.RequestLogger(&chimiddleware.DefaultLogFormatter{
482+
Logger: log.DefaultLogger,
483+
NoColor: true,
484+
}))
485+
registry := prometheus.NewRegistry()
486+
system.RegisterMetrics(registry)
487+
httpx.AddMetricRoutes(mux, registry)
488+
metricsServer := &http.Server{
489+
Addr: fmt.Sprintf(":%d", metricsPort),
490+
Handler: mux,
491+
ReadTimeout: 5 * time.Second,
492+
}
493+
go func() {
494+
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
495+
log.Fatalf("error running metrics server: %v", err)
496+
}
497+
}()
498+
defer func() {
499+
log.Info("Waiting for metrics to be flushed")
500+
// by default, the scrape_interval for prometheus is 1 minute
501+
// so if we sleep for 1.5 minutes we ensure that all remaining metrics
502+
// will be picked up by the prometheus scraper
503+
time.Sleep(time.Minute + time.Second*30)
504+
log.Info("Shutting down metrics server...")
505+
if err := metricsServer.Shutdown(context.Background()); err != nil {
506+
log.Warnf("error shutting down metrics server: %v", err)
507+
}
508+
}()
509+
} else {
510+
log.Info("Metrics server disabled")
511+
}
512+
return f()
513+
}
514+
499515
func init() {
500516
DefineIngestCommands(RootCmd, globalConfig, globalFlags)
501517
}
@@ -525,11 +541,13 @@ func processVerifyRange(horizonConfig *horizon.Config, horizonFlags config.Confi
525541
return err
526542
}
527543

528-
return system.VerifyRange(
529-
ingestVerifyFrom,
530-
ingestVerifyTo,
531-
ingestVerifyState,
532-
)
544+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
545+
return system.VerifyRange(
546+
ingestVerifyFrom,
547+
ingestVerifyTo,
548+
ingestVerifyState,
549+
)
550+
})
533551
}
534552

535553
// generateDatastoreConfigOpt returns a *support.ConfigOption for the datastore-config flag

services/horizon/internal/httpx/router.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,7 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
394394
w.Header().Set("Content-Type", "application/openapi+yaml")
395395
w.Write(p)
396396
})
397-
r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP)
398-
r.Internal.Get("/debug/pprof/heap", pprof.Index)
399-
r.Internal.Get("/debug/pprof/profile", pprof.Profile)
397+
AddMetricRoutes(r.Internal, config.PrometheusRegistry)
400398
r.Internal.Route("/ingestion/filters", func(r chi.Router) {
401399
handler := actions.FilterConfigHandler{}
402400
r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig)
@@ -405,3 +403,9 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
405403
r.With(historyMiddleware).Get("/account", handler.GetAccountConfig)
406404
})
407405
}
406+
407+
func AddMetricRoutes(mux *chi.Mux, metrics *prometheus.Registry) {
408+
mux.Get("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{}).ServeHTTP)
409+
mux.Get("/debug/pprof/heap", pprof.Index)
410+
mux.Get("/debug/pprof/profile", pprof.Profile)
411+
}

0 commit comments

Comments
 (0)