Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/integrationtests/infrastructure/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,10 @@ func createRPCService(containers *SharedContainers, ctx context.Context) (servic
}

// Start tracking RPC health
go rpcService.TrackRPCServiceHealth(ctx, nil)
go func() {
//nolint:errcheck // Error is expected on context cancellation during shutdown
rpcService.TrackRPCServiceHealth(ctx, nil)
}()

return rpcService, nil
}
Expand Down
1 change: 0 additions & 1 deletion internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func initHandlerDeps(ctx context.Context, cfg Configs) (handlerDeps, error) {
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(ctx, nil)

channelAccountStore := store.NewChannelAccountModel(dbConnectionPool)

Expand Down
137 changes: 78 additions & 59 deletions internal/services/channel_account_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,70 +236,91 @@ func (s *channelAccountService) submitChannelAccountsTxOnChain(
ops []txnbuild.Operation,
chAccSigner ChannelAccSigner,
) error {
// Wait for RPC service to become healthy by polling GetHealth directly.
// This lets the API server startup so that users can start interacting with the API
// which does not depend on RPC, instead of waiting till it becomes healthy.
log.Ctx(ctx).Infof("⏳ Waiting for RPC service to become healthy")
rpcHeartbeatChannel := s.RPCService.GetHeartbeatChannel()
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err())

// The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup.
// This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy.
case <-rpcHeartbeatChannel:
log.Ctx(ctx).Infof("👍 RPC service is healthy")
accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key=%s: %w", distributionAccountPublicKey, err)
}

tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}
healthCheckCtx, cancel := context.WithTimeout(ctx, rpcHealthCheckTimeout)
defer cancel()

// Sign the transaction for the distribution account
tx, err = s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction for distribution account: %w", err)
}
// Sign the transaction for the channel accounts
tx, err = chAccSigner(ctx, tx)
if err != nil {
return fmt.Errorf("signing transaction with channel account(s) keypairs: %w", err)
}
ticker := time.NewTicker(sleepDelayForChannelAccountCreation)
defer ticker.Stop()

txHash, err := tx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}
txXDR, err := tx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
// Try immediately first
_, err := s.RPCService.GetHealth()
if err != nil {
log.Ctx(ctx).Debugf("Initial RPC health check failed: %v, will retry...", err)
for {
select {
case <-healthCheckCtx.Done():
return fmt.Errorf("timeout waiting for RPC service to become healthy: %w", healthCheckCtx.Err())
case <-ticker.C:
_, err = s.RPCService.GetHealth()
if err == nil {
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a label on this break? The nested breaks are easy to misinterpret.

}
log.Ctx(ctx).Debugf("RPC health check failed: %v, will retry...", err)
continue
}
break
}
}

log.Ctx(ctx).Infof("🚧 Submitting channel account transaction to RPC service")
err = s.submitTransaction(ctx, txHash, txXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to RPC service: %w", err)
}
log.Ctx(ctx).Infof("🚧 Successfully submitted channel account transaction to RPC service, waiting for confirmation...")
err = s.waitForTransactionConfirmation(ctx, txHash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}
log.Ctx(ctx).Infof("👍 RPC service is healthy")
accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key=%s: %w", distributionAccountPublicKey, err)
}

return nil
tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}

// Sign the transaction for the distribution account
tx, err = s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction for distribution account: %w", err)
}
// Sign the transaction for the channel accounts
tx, err = chAccSigner(ctx, tx)
if err != nil {
return fmt.Errorf("signing transaction with channel account(s) keypairs: %w", err)
}

txHash, err := tx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}
txXDR, err := tx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
}

log.Ctx(ctx).Infof("🚧 Submitting channel account transaction to RPC service")
err = s.submitTransaction(ctx, txHash, txXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to RPC service: %w", err)
}
log.Ctx(ctx).Infof("🚧 Successfully submitted channel account transaction to RPC service, waiting for confirmation...")
err = s.waitForTransactionConfirmation(ctx, txHash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}

return nil
}

func (s *channelAccountService) submitTransaction(_ context.Context, hash string, signedTxXDR string) error {
Expand Down Expand Up @@ -399,14 +420,12 @@ func (o *ChannelAccountServiceOptions) Validate() error {
return nil
}

func NewChannelAccountService(ctx context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error) {
func NewChannelAccountService(_ context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error) {
err := opts.Validate()
if err != nil {
return nil, fmt.Errorf("validating channel account service options: %w", err)
}

go opts.RPCService.TrackRPCServiceHealth(ctx, nil)

return &channelAccountService{
DB: opts.DB,
RPCService: opts.RPCService,
Expand Down
39 changes: 13 additions & 26 deletions internal/services/channel_account_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package services

import (
"context"
"fmt"
"testing"
"time"

"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
Expand Down Expand Up @@ -107,11 +107,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
Return(network.TestNetworkPassphrase).
Once()

heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"}
mockRPCService.
On("GetHeartbeatChannel").
Return(heartbeatChan).
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).
Once().
On("GetAccountLedgerSequence", distributionAccount.Address()).
Return(int64(123), nil).
Expand Down Expand Up @@ -195,10 +193,8 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
Return(&signedTx, nil).
Once()

heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"}
mockRPCService.
On("GetHeartbeatChannel").Return(heartbeatChan).Once().
On("GetHealth").Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).Once().
On("GetAccountLedgerSequence", distributionAccount.Address()).Return(int64(123), nil).Once().
On("GetAccountLedgerSequence", chAcc1.PublicKey).Return(int64(123), nil).Once().
On("GetAccountLedgerSequence", chAcc2.PublicKey).Return(int64(123), nil).Once().
Expand Down Expand Up @@ -256,11 +252,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
Return(network.TestNetworkPassphrase).
Once()

heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"}
mockRPCService.
On("GetHeartbeatChannel").
Return(heartbeatChan).
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).
Once().
On("GetAccountLedgerSequence", distributionAccount.Address()).
Return(int64(123), nil).
Expand Down Expand Up @@ -312,11 +306,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
Return(network.TestNetworkPassphrase).
Once()

heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"}
mockRPCService.
On("GetHeartbeatChannel").
Return(heartbeatChan).
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).
Once().
On("GetAccountLedgerSequence", distributionAccount.Address()).
Return(int64(123), nil).
Expand Down Expand Up @@ -349,10 +341,12 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
Return(distributionAccount.Address(), nil).
Once()

heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{}, fmt.Errorf("RPC unavailable")).
Maybe()
},
expectedError: "context cancelled while waiting for rpc service to become healthy",
expectedError: "timeout waiting for RPC service to become healthy",
},
}

Expand All @@ -361,7 +355,6 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
// Create fresh mocks for each test
ctx := tc.getCtx()
mockRPCService := NewRPCServiceMock(t)
mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return()
distAccSigClient := signing.NewSignatureClientMock(t)
chAccSigClient := signing.NewSignatureClientMock(t)
channelAccountStore := store.NewChannelAccountStoreMock(t)
Expand All @@ -380,7 +373,6 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) {
PrivateKeyEncrypter: &signingutils.DefaultPrivateKeyEncrypter{},
EncryptionPassphrase: "my-encryption-passphrase",
})
time.Sleep(50 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth`
require.NoError(t, err)

// Execute test
Expand All @@ -407,7 +399,6 @@ func TestSubmitTransaction(t *testing.T) {

ctx := context.Background()
mockRPCService := RPCServiceMock{}
mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return()
defer mockRPCService.AssertExpectations(t)
signatureClient := signing.SignatureClientMock{}
channelAccountStore := store.ChannelAccountStoreMock{}
Expand All @@ -422,9 +413,7 @@ func TestSubmitTransaction(t *testing.T) {
PrivateKeyEncrypter: &privateKeyEncrypter,
EncryptionPassphrase: passphrase,
})
time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth`
require.NoError(t, err)
time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth`

hash := "test_hash"
signedTxXDR := "test_xdr"
Expand Down Expand Up @@ -465,7 +454,6 @@ func TestWaitForTransactionConfirmation(t *testing.T) {
ctx := context.Background()
mockRPCService := RPCServiceMock{}
defer mockRPCService.AssertExpectations(t)
mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return()
signatureClient := signing.SignatureClientMock{}
channelAccountStore := store.ChannelAccountStoreMock{}
privateKeyEncrypter := signingutils.DefaultPrivateKeyEncrypter{}
Expand All @@ -480,7 +468,6 @@ func TestWaitForTransactionConfirmation(t *testing.T) {
EncryptionPassphrase: passphrase,
})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth`

hash := "test_hash"

Expand Down
18 changes: 15 additions & 3 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@

func (m *ingestService) DeprecatedRun(ctx context.Context, startLedger uint32, endLedger uint32) error {
manualTriggerChannel := make(chan any, 1)
go m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChannel)
go func() {
if err := m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChannel); err != nil {
log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err)
}
}()
ingestHeartbeatChannel := make(chan any, 1)
rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel()
go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker)
Expand Down Expand Up @@ -190,7 +194,7 @@

// update cursor
err = db.RunInTransaction(ctx, m.models.DB, nil, func(dbTx db.Transaction) error {
if err := m.models.IngestStore.UpdateLatestLedgerSynced(ctx, dbTx, m.ledgerCursorName, ingestLedger); err != nil {

Check failure on line 197 in internal/services/ingest.go

View workflow job for this annotation

GitHub Actions / check

declaration of "err" shadows declaration at line 182
return fmt.Errorf("updating latest synced ledger: %w", err)
}
return nil
Expand Down Expand Up @@ -257,7 +261,7 @@
}
checkpointLedger := m.accountTokenService.GetCheckpointLedger()
err = db.RunInTransaction(ctx, m.models.DB, nil, func(dbTx db.Transaction) error {
if err := m.models.IngestStore.UpdateLatestLedgerSynced(ctx, dbTx, m.accountTokensCursorName, checkpointLedger); err != nil {

Check failure on line 264 in internal/services/ingest.go

View workflow job for this annotation

GitHub Actions / check

declaration of "err" shadows declaration at line 258
return fmt.Errorf("updating latest synced account-tokens ledger: %w", err)
}
return nil
Expand All @@ -269,20 +273,28 @@

// Prepare the health check:
manualTriggerChan := make(chan any, 1)
go m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan)
go func() {
if err := m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan); err != nil {
log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err)
}
}()
ingestHeartbeatChannel := make(chan any, 1)
rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel()
go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker)
var rpcHealth entities.RPCGetHealthResult

log.Ctx(ctx).Info("Starting ingestion loop")
for {
var ok bool
select {
case sig := <-signalChan:
return fmt.Errorf("ingestor stopped due to signal %q", sig)
case <-ctx.Done():
return fmt.Errorf("ingestor stopped due to context cancellation: %w", ctx.Err())
case rpcHealth = <-rpcHeartbeatChannel:
case rpcHealth, ok = <-rpcHeartbeatChannel:
if !ok {
return fmt.Errorf("RPC heartbeat channel closed unexpectedly")
}
ingestHeartbeatChannel <- true // ⬅️ indicate that it's still running
// this will fallthrough to execute the code below ⬇️
}
Expand Down Expand Up @@ -312,12 +324,12 @@
// update cursors
err = db.RunInTransaction(ctx, m.models.DB, nil, func(dbTx db.Transaction) error {
lastLedger := getLedgersResponse.Ledgers[len(getLedgersResponse.Ledgers)-1].Sequence
if err := m.models.IngestStore.UpdateLatestLedgerSynced(ctx, dbTx, m.ledgerCursorName, lastLedger); err != nil {

Check failure on line 327 in internal/services/ingest.go

View workflow job for this annotation

GitHub Actions / check

declaration of "err" shadows declaration at line 304
return fmt.Errorf("updating latest synced ledger: %w", err)
}
checkpointLedger := m.accountTokenService.GetCheckpointLedger()
if lastLedger > checkpointLedger {
if err := m.models.IngestStore.UpdateLatestLedgerSynced(ctx, dbTx, m.accountTokensCursorName, lastLedger); err != nil {

Check failure on line 332 in internal/services/ingest.go

View workflow job for this annotation

GitHub Actions / check

declaration of "err" shadows declaration at line 304
return fmt.Errorf("updating latest synced account-tokens ledger: %w", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/services/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) {
mockAppTracker := apptracker.MockAppTracker{}
mockRPCService := RPCServiceMock{}
mockRPCService.
On("TrackRPCServiceHealth", ctx, mock.Anything).Once().
On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil).Once().
On("NetworkPassphrase").Return(network.TestNetworkPassphrase)
mockChAccStore := &store.ChannelAccountStoreMock{}
mockContractStore := &cache.MockTokenContractStore{}
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) {
mockAppTracker := apptracker.MockAppTracker{}
mockRPCService := RPCServiceMock{}
mockRPCService.
On("TrackRPCServiceHealth", ctx, mock.Anything).Once().
On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil).Once().
On("NetworkPassphrase").Return(network.TestNetworkPassphrase)
mockChAccStore := &store.ChannelAccountStoreMock{}
mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, testInnerTxHash).Return(int64(1), nil).Twice()
Expand Down
5 changes: 3 additions & 2 deletions internal/services/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type RPCServiceMock struct {

var _ RPCService = (*RPCServiceMock)(nil)

func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat chan any) {
r.Called(ctx, triggerHeartbeat)
func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat <-chan any) error {
args := r.Called(ctx, triggerHeartbeat)
return args.Error(0)
}

func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult {
Expand Down
Loading
Loading