Skip to content

Commit a0dded9

Browse files
authored
Merge branch 'master' into friendbot-instrumentation-03
2 parents 292880c + 474a86a commit a0dded9

24 files changed

+1625
-636
lines changed

.github/workflows/horizon.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ jobs:
112112
key: ${{ env.COMBINED_SOURCE_HASH }}
113113

114114
- if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
115-
run: go test -race -timeout 65m -v ./services/horizon/internal/integration/...
115+
run: go test -race -timeout 75m -v ./services/horizon/internal/integration/...
116116

117117
- name: Save Horizon binary and integration tests source hash to cache
118118
if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}

ingest/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. This projec
1010
- `DefaultBufferedStorageBackendConfig`
1111
- `PublisherConfig`
1212
These are now relocated to `ingest` package. Will need to change references in existing code.
13+
* Moved the `ingest/verify` package to be internally located at `services/horizon/internal/ingest` package in `verify.go` for overall go repo restructuring. [5670](https://github.com/stellar/go/issues/5670)
1314

1415

1516
## v23.0.0

ingest/loadtest/ledger_backend.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"math"
88
"os"
9+
"sync"
910
"time"
1011

1112
"github.com/klauspost/compress/zstd"
@@ -16,6 +17,9 @@ import (
1617
"github.com/stellar/go/xdr"
1718
)
1819

20+
// ErrLoadTestDone indicates that the load test has run to completion.
21+
var ErrLoadTestDone = fmt.Errorf("the load test is done")
22+
1923
// LedgerBackend is used to load test ingestion.
2024
// LedgerBackend will take a file of synthetically generated ledgers (see
2125
// services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers
@@ -31,6 +35,8 @@ type LedgerBackend struct {
3135
latestLedgerSeq uint32
3236
preparedRange ledgerbackend.Range
3337
cachedLedger xdr.LedgerCloseMeta
38+
done bool
39+
lock sync.RWMutex
3440
}
3541

3642
// LedgerBackendConfig configures LedgerBackend
@@ -57,6 +63,9 @@ func NewLedgerBackend(config LedgerBackendConfig) *LedgerBackend {
5763
}
5864

5965
func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
66+
r.lock.RLock()
67+
defer r.lock.RUnlock()
68+
6069
if r.nextLedgerSeq == 0 {
6170
return 0, fmt.Errorf("PrepareRange() must be called before GetLatestLedgerSequence()")
6271
}
@@ -94,6 +103,12 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) {
94103
}
95104

96105
func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error {
106+
r.lock.Lock()
107+
defer r.lock.Unlock()
108+
109+
if r.done {
110+
return ErrLoadTestDone
111+
}
97112
if r.nextLedgerSeq != 0 {
98113
if r.isPrepared(ledgerRange) {
99114
return nil
@@ -282,6 +297,9 @@ func validateNetworkPassphrase(networkPassphrase string, ledger xdr.LedgerCloseM
282297
}
283298

284299
func (r *LedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) {
300+
r.lock.RLock()
301+
defer r.lock.RUnlock()
302+
285303
return r.isPrepared(ledgerRange), nil
286304
}
287305

@@ -302,6 +320,15 @@ func (r *LedgerBackend) isPrepared(ledgerRange ledgerbackend.Range) bool {
302320
}
303321

304322
func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) {
323+
r.lock.RLock()
324+
closeLedgerBackend := false
325+
defer func() {
326+
r.lock.RUnlock()
327+
if closeLedgerBackend {
328+
r.Close()
329+
}
330+
}()
331+
305332
if r.nextLedgerSeq == 0 {
306333
return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()")
307334
}
@@ -312,6 +339,13 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led
312339
r.cachedLedger.LedgerSequence(),
313340
)
314341
}
342+
if r.done {
343+
return xdr.LedgerCloseMeta{}, ErrLoadTestDone
344+
}
345+
if sequence > r.latestLedgerSeq {
346+
closeLedgerBackend = true
347+
return xdr.LedgerCloseMeta{}, ErrLoadTestDone
348+
}
315349
for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ {
316350
var ledger xdr.LedgerCloseMeta
317351
if err := r.mergedLedgersStream.ReadOne(&ledger); err == io.EOF {
@@ -339,6 +373,10 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led
339373
}
340374

341375
func (r *LedgerBackend) Close() error {
376+
r.lock.Lock()
377+
defer r.lock.Unlock()
378+
379+
r.done = true
342380
if err := r.config.LedgerBackend.Close(); err != nil {
343381
return fmt.Errorf("could not close real ledger backend: %w", err)
344382
}
@@ -347,9 +385,13 @@ func (r *LedgerBackend) Close() error {
347385
if err := r.mergedLedgersStream.Close(); err != nil {
348386
return fmt.Errorf("could not close merged ledgers xdr stream: %w", err)
349387
}
388+
r.mergedLedgersStream = nil
389+
}
390+
if r.mergedLedgersFilePath != "" {
350391
if err := os.Remove(r.mergedLedgersFilePath); err != nil {
351392
return fmt.Errorf("could not remove merged ledgers file: %w", err)
352393
}
394+
r.mergedLedgersFilePath = ""
353395
}
354396
return nil
355397
}

ingest/verify/main.go

Lines changed: 0 additions & 226 deletions
This file was deleted.

0 commit comments

Comments
 (0)