Skip to content

Commit

Permalink
services/horizon/ingest: Use buffered storage backend for reingest co…
Browse files Browse the repository at this point in the history
…mmand (#5374)
  • Loading branch information
sreuland committed Jul 18, 2024
1 parent 0e83215 commit d5767df
Show file tree
Hide file tree
Showing 25 changed files with 1,327 additions and 687 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ledgerexporter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ jobs:
CAPTIVE_CORE_DEBIAN_PKG_VERSION: 21.1.0-1921.b3aeb14cc.focal
LEDGEREXPORTER_INTEGRATION_TESTS_ENABLED: "true"
LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
# this pins to a version of quickstart:testing that has the same version as LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
# this pins to a version of quickstart:testing that has the same version of core
# as specified on LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN
# this is the multi-arch index sha, get it by 'docker buildx imagetools inspect stellar/quickstart:testing'
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:03c6679f838a92b1eda4cd3a9e2bdee4c3586e278a138a0acf36a9bc99a0041f
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE: docker.io/stellar/quickstart:testing@sha256:5c8186f53cc98571749054dd782dce33b0aca2d1a622a7610362f7c15b79b1bf
LEDGEREXPORTER_INTEGRATION_TESTS_QUICKSTART_IMAGE_PULL: "false"
steps:
- name: Install captive core
Expand Down
17 changes: 11 additions & 6 deletions exp/services/ledgerexporter/internal/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// tests then refer to ledger sequences only up to this, therefore
// don't have to do complex waiting within test for a sequence to exist.
waitForCoreLedgerSequence = 16
configTemplate = "test/integration_config_template.toml"
)

func TestLedgerExporterTestSuite(t *testing.T) {
Expand All @@ -54,6 +55,7 @@ type LedgerExporterTestSuite struct {
dockerCli *client.Client
gcsServer *fakestorage.Server
finishedSetup bool
config Config
}

func (s *LedgerExporterTestSuite) TestScanAndFill() {
Expand All @@ -74,7 +76,7 @@ func (s *LedgerExporterTestSuite) TestScanAndFill() {
s.T().Log(output)
s.T().Log(errOutput)

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFFA--5.xdr.zstd")
Expand Down Expand Up @@ -104,7 +106,7 @@ func (s *LedgerExporterTestSuite) TestAppend() {
s.T().Log(output)
s.T().Log(errOutput)

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

_, err = datastore.GetFile(s.ctx, "FFFFFFFF--0-9/FFFFFFF6--9.xdr.zstd")
Expand Down Expand Up @@ -134,7 +136,7 @@ func (s *LedgerExporterTestSuite) TestAppendUnbounded() {
s.T().Log(errOutput)
}()

datastore, err := datastore.NewGCSDataStore(s.ctx, "integration-test/standalone")
datastore, err := datastore.NewDataStore(s.ctx, s.config.DataStoreConfig)
require.NoError(err)

require.EventuallyWithT(func(c *assert.CollectT) {
Expand All @@ -158,9 +160,9 @@ func (s *LedgerExporterTestSuite) SetupSuite() {
}()
testTempDir := t.TempDir()

ledgerExporterConfigTemplate, err := toml.LoadFile("test/integration_config_template.toml")
ledgerExporterConfigTemplate, err := toml.LoadFile(configTemplate)
if err != nil {
t.Fatalf("unable to load config template file %v", err)
t.Fatalf("unable to load config template file %v, %v", configTemplate, err)
}

// if LEDGEREXPORTER_INTEGRATION_TESTS_CAPTIVE_CORE_BIN not specified,
Expand All @@ -172,7 +174,10 @@ func (s *LedgerExporterTestSuite) SetupSuite() {

tomlBytes, err := toml.Marshal(ledgerExporterConfigTemplate)
if err != nil {
t.Fatalf("unable to load config file %v", err)
t.Fatalf("unable to parse config file toml %v, %v", configTemplate, err)
}
if err = toml.Unmarshal(tomlBytes, &s.config); err != nil {
t.Fatalf("unable to marshal config file toml into struct, %v", err)
}

tempSeedDataPath := filepath.Join(testTempDir, "data")
Expand Down
20 changes: 7 additions & 13 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.DataStoreSchema
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
RetryLimit uint32
RetryWait time.Duration
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
Expand All @@ -45,7 +43,7 @@ type BufferedStorageBackend struct {
}

// NewBufferedStorageBackend returns a new BufferedStorageBackend instance.
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) {
func NewBufferedStorageBackend(config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) {
if config.BufferSize == 0 {
return nil, errors.New("buffer size must be > 0")
}
Expand All @@ -54,17 +52,13 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("number of workers must be <= BufferSize")
}

if config.DataStore == nil {
return nil, errors.New("no DataStore provided")
}

if config.LedgerBatchConfig.LedgersPerFile <= 0 {
if dataStore.GetSchema().LedgersPerFile <= 0 {
return nil, errors.New("ledgersPerFile must be > 0")
}

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}

return bsBackend, nil
Expand Down
64 changes: 39 additions & 25 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
param := make(map[string]string)
param["destination_bucket_path"] = "testURL"

ledgerBatchConfig := datastore.DataStoreSchema{
LedgersPerFile: 1,
FilesPerPartition: 64000,
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
}
}

func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()

dataStore := new(datastore.MockDataStore)
return BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}
}

Expand All @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -126,15 +122,18 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
}

func TestNewBufferedStorageBackend(t *testing.T) {
ctx := context.Background()
config := createBufferedStorageBackendConfigForTesting()

bsb, err := NewBufferedStorageBackend(ctx, config)
mockDataStore := new(datastore.MockDataStore)
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(1),
FilesPerPartition: partitionSize,
})
bsb, err := NewBufferedStorageBackend(config, mockDataStore)
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, bsb.dataStore, mockDataStore)
assert.Equal(t, uint32(1), bsb.dataStore.GetSchema().LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema().FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
assert.Equal(t, uint32(5), bsb.config.NumWorkers)
assert.Equal(t, uint32(3), bsb.config.RetryLimit)
Expand Down Expand Up @@ -210,12 +209,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2)
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)

Expand Down Expand Up @@ -451,6 +452,10 @@ func TestLedgerBufferClose(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1
mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
Expand Down Expand Up @@ -483,7 +488,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
Expand All @@ -509,7 +517,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
Expand Down Expand Up @@ -551,7 +562,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
})

bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema").Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange))

bsb.ledgerBuffer.wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (lb *ledgerBuffer) pushTaskQueue() {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.nextTaskLedger += lb.dataStore.GetSchema().LedgersPerFile
}

// sleepWithContext returns true upon sleeping without interruption from the context
Expand Down Expand Up @@ -163,7 +163,7 @@ func (lb *ledgerBuffer) worker(ctx context.Context) {
}

func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) {
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
objectKey := lb.dataStore.GetSchema().GetObjectKeyFromSequenceNumber(sequence)

reader, err := lb.dataStore.GetFile(ctx, objectKey)
if err != nil {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) {
item := lb.ledgerPriorityQueue.Pop()
lb.ledgerQueue <- item.payload
lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.currentLedger += lb.dataStore.GetSchema().LedgersPerFile
}
}

Expand Down
11 changes: 11 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## Pending

### Added

- Reingest from pre-computed tx meta on remote cloud storage. ([4911](https://github.com/stellar/go/issues/4911)), ([5374](https://github.com/stellar/go/pull/5374))
- Configure horizon reingestion to obtain ledger tx meta in pre-computed files from a Google Cloud Storage(GCS) location.
- Using this option will no longer require a captive core binary be present and it no longer runs a captive core sub-process, instead obtaining the tx meta from the GCS backend.
- Horizon supports this new feature with two new parameters `ledgerbackend` and `datastore-config` on the `reingest` command. Refer to [Reingestion README](./internal/ingest/README.md#reingestion).



## 2.31.0

### Breaking Changes
Expand Down
Loading

0 comments on commit d5767df

Please sign in to comment.