From 044e4df48132691116c67f8ea82afc8a76710b62 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 18 Nov 2025 16:50:51 -0500 Subject: [PATCH 1/3] Goroutine refactor - 1 --- .../integrationtests/infrastructure/setup.go | 5 +++- internal/serve/serve.go | 6 ++++- internal/services/channel_account_service.go | 6 ++++- .../services/channel_account_service_test.go | 2 +- internal/services/ingest.go | 18 ++++++++++--- internal/services/ingest_test.go | 4 +-- internal/services/mocks.go | 5 ++-- internal/services/rpc_service.go | 27 ++++++++++--------- internal/services/rpc_service_test.go | 14 +++++++--- 9 files changed, 59 insertions(+), 28 deletions(-) diff --git a/internal/integrationtests/infrastructure/setup.go b/internal/integrationtests/infrastructure/setup.go index 12dec9d2..d4d73e17 100644 --- a/internal/integrationtests/infrastructure/setup.go +++ b/internal/integrationtests/infrastructure/setup.go @@ -1023,7 +1023,10 @@ func createRPCService(containers *SharedContainers, ctx context.Context) (servic } // Start tracking RPC health - go rpcService.TrackRPCServiceHealth(ctx, nil) + go func() { + //nolint:errcheck // Error is expected on context cancellation during shutdown + rpcService.TrackRPCServiceHealth(ctx, nil) + }() return rpcService, nil } diff --git a/internal/serve/serve.go b/internal/serve/serve.go index ea5f4965..2efca7af 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -136,7 +136,11 @@ func initHandlerDeps(ctx context.Context, cfg Configs) (handlerDeps, error) { if err != nil { return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err) } - go rpcService.TrackRPCServiceHealth(ctx, nil) + go func() { + if err := rpcService.TrackRPCServiceHealth(ctx, nil); err != nil { + log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) + } + }() channelAccountStore := store.NewChannelAccountModel(dbConnectionPool) diff --git a/internal/services/channel_account_service.go b/internal/services/channel_account_service.go index 62bb351e..d4f5c88a 100644 --- a/internal/services/channel_account_service.go +++ b/internal/services/channel_account_service.go @@ -405,7 +405,11 @@ func NewChannelAccountService(ctx context.Context, opts ChannelAccountServiceOpt return nil, fmt.Errorf("validating channel account service options: %w", err) } - go opts.RPCService.TrackRPCServiceHealth(ctx, nil) + go func() { + if err := opts.RPCService.TrackRPCServiceHealth(ctx, nil); err != nil { + log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) + } + }() return &channelAccountService{ DB: opts.DB, diff --git a/internal/services/channel_account_service_test.go b/internal/services/channel_account_service_test.go index 3acd07be..bc0a3fc0 100644 --- a/internal/services/channel_account_service_test.go +++ b/internal/services/channel_account_service_test.go @@ -361,7 +361,7 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { // Create fresh mocks for each test ctx := tc.getCtx() mockRPCService := NewRPCServiceMock(t) - mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return() + mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil) distAccSigClient := signing.NewSignatureClientMock(t) chAccSigClient := signing.NewSignatureClientMock(t) channelAccountStore := store.NewChannelAccountStoreMock(t) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index b0c39f05..0cc09c13 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -140,7 +140,11 @@ func NewIngestService( func (m *ingestService) DeprecatedRun(ctx context.Context, startLedger uint32, endLedger uint32) error { manualTriggerChannel := make(chan any, 1) - go m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChannel) + go func() { + if err := m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChannel); err != nil { + log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) + } + }() ingestHeartbeatChannel := make(chan any, 1) rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel() go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker) @@ -269,7 +273,11 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u // Prepare the health check: manualTriggerChan := make(chan any, 1) - go m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan) + go func() { + if err := m.rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan); err != nil { + log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) + } + }() ingestHeartbeatChannel := make(chan any, 1) rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel() go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker) @@ -277,12 +285,16 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u log.Ctx(ctx).Info("Starting ingestion loop") for { + var ok bool select { case sig := <-signalChan: return fmt.Errorf("ingestor stopped due to signal %q", sig) case <-ctx.Done(): return fmt.Errorf("ingestor stopped due to context cancellation: %w", ctx.Err()) - case rpcHealth = <-rpcHeartbeatChannel: + case rpcHealth, ok = <-rpcHeartbeatChannel: + if !ok { + return fmt.Errorf("RPC heartbeat channel closed unexpectedly") + } ingestHeartbeatChannel <- true // ⬅️ indicate that it's still running // this will fallthrough to execute the code below ⬇️ } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 0bb2ff4b..cd8caf7b 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -235,7 +235,7 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { mockAppTracker := apptracker.MockAppTracker{} mockRPCService := RPCServiceMock{} mockRPCService. - On("TrackRPCServiceHealth", ctx, mock.Anything).Once(). + On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil).Once(). On("NetworkPassphrase").Return(network.TestNetworkPassphrase) mockChAccStore := &store.ChannelAccountStoreMock{} mockContractStore := &cache.MockTokenContractStore{} @@ -328,7 +328,7 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { mockAppTracker := apptracker.MockAppTracker{} mockRPCService := RPCServiceMock{} mockRPCService. - On("TrackRPCServiceHealth", ctx, mock.Anything).Once(). + On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil).Once(). On("NetworkPassphrase").Return(network.TestNetworkPassphrase) mockChAccStore := &store.ChannelAccountStoreMock{} mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, testInnerTxHash).Return(int64(1), nil).Twice() diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 611c84c8..472628d6 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -14,8 +14,9 @@ type RPCServiceMock struct { var _ RPCService = (*RPCServiceMock)(nil) -func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat chan any) { - r.Called(ctx, triggerHeartbeat) +func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat <-chan any) error { + args := r.Called(ctx, triggerHeartbeat) + return args.Error(0) } func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult { diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index ff9417f1..d276dd0d 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -8,9 +8,6 @@ import ( "fmt" "io" "net/http" - "os" - "os/signal" - "syscall" "time" "github.com/stellar/go/support/log" @@ -42,7 +39,9 @@ type RPCService interface { // // The immediateHealthCheckTrigger channel allows external components to request an immediate health check, // which is particularly useful when the ingestor needs to catch up with the RPC service. - TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger chan any) + // + // Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals. + TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error) NetworkPassphrase() string } @@ -333,14 +332,17 @@ func (r *rpcService) HealthCheckTickInterval() time.Duration { // // The immediateHealthCheckTrigger channel allows external components to request an immediate health check, // which is particularly useful when the ingestor needs to catch up with the RPC service. -func (r *rpcService) TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger chan any) { - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) +// +// Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals. +func (r *rpcService) TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error { + // Handle nil channel by creating a never-firing channel + if immediateHealthCheckTrigger == nil { + immediateHealthCheckTrigger = make(chan any) + } healthCheckTicker := time.NewTicker(r.HealthCheckTickInterval()) unhealthyWarningTicker := time.NewTicker(r.HealthCheckWarningInterval()) defer func() { - signal.Stop(signalChan) healthCheckTicker.Stop() unhealthyWarningTicker.Stop() close(r.heartbeatChannel) @@ -368,15 +370,14 @@ func (r *rpcService) TrackRPCServiceHealth(ctx context.Context, immediateHealthC r.metricsService.SetRPCLatestLedger(int64(health.LatestLedger)) } + // Perform immediate health check at startup to avoid 5-second delay + performHealthCheck() + for { select { case <-ctx.Done(): log.Ctx(ctx).Infof("RPC health tracking stopped due to context cancellation: %v", ctx.Err()) - return - - case sig := <-signalChan: - log.Ctx(ctx).Warnf("RPC health tracking stopped due to signal %s", sig) - return + return fmt.Errorf("context cancelled: %w", ctx.Err()) case <-unhealthyWarningTicker.C: log.Ctx(ctx).Warnf("RPC service unhealthy for over %s", r.HealthCheckWarningInterval()) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 01f079fc..7b8f3c5b 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -926,7 +926,8 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil).Run(func(args mock.Arguments) { cancel() }) - rpcService.TrackRPCServiceHealth(ctx, nil) + err = rpcService.TrackRPCServiceHealth(ctx, nil) + require.Error(t, err) // Get result from heartbeat channel select { @@ -998,7 +999,8 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { Return(mockResponse, nil) // The ctx will timeout after {contextTimeout}, which is enough for the warning to trigger - rpcService.TrackRPCServiceHealth(ctx, nil) + err = rpcService.TrackRPCServiceHealth(ctx, nil) + require.Error(t, err) entries := getLogs() testSucceeded := false @@ -1029,7 +1031,8 @@ func TestTrackRPCService_ContextCancelled(t *testing.T) { rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) require.NoError(t, err) - rpcService.TrackRPCServiceHealth(ctx, nil) + err = rpcService.TrackRPCServiceHealth(ctx, nil) + require.Error(t, err) // Verify channel is closed after context cancellation time.Sleep(100 * time.Millisecond) @@ -1086,7 +1089,10 @@ func TestTrackRPCService_DeadlockPrevention(t *testing.T) { defer cancel() manualTriggerChan := make(chan any, 1) - go rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan) + go func() { + //nolint:errcheck // Error is expected on context cancellation + rpcService.TrackRPCServiceHealth(ctx, manualTriggerChan) + }() time.Sleep(20 * time.Millisecond) manualTriggerChan <- nil From 4f804aeaad3bcc3088df4e6d1d69c7b42d9783da Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 18 Nov 2025 16:58:02 -0500 Subject: [PATCH 2/3] remove goroutine from serve --- internal/serve/serve.go | 5 - internal/services/channel_account_service.go | 141 ++++++++++-------- .../services/channel_account_service_test.go | 39 ++--- 3 files changed, 91 insertions(+), 94 deletions(-) diff --git a/internal/serve/serve.go b/internal/serve/serve.go index 2efca7af..1982616e 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -136,11 +136,6 @@ func initHandlerDeps(ctx context.Context, cfg Configs) (handlerDeps, error) { if err != nil { return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err) } - go func() { - if err := rpcService.TrackRPCServiceHealth(ctx, nil); err != nil { - log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) - } - }() channelAccountStore := store.NewChannelAccountModel(dbConnectionPool) diff --git a/internal/services/channel_account_service.go b/internal/services/channel_account_service.go index d4f5c88a..90c6d17b 100644 --- a/internal/services/channel_account_service.go +++ b/internal/services/channel_account_service.go @@ -236,70 +236,91 @@ func (s *channelAccountService) submitChannelAccountsTxOnChain( ops []txnbuild.Operation, chAccSigner ChannelAccSigner, ) error { + // Wait for RPC service to become healthy by polling GetHealth directly. + // This lets the API server startup so that users can start interacting with the API + // which does not depend on RPC, instead of waiting till it becomes healthy. log.Ctx(ctx).Infof("⏳ Waiting for RPC service to become healthy") - rpcHeartbeatChannel := s.RPCService.GetHeartbeatChannel() - select { - case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err()) - - // The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup. - // This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy. - case <-rpcHeartbeatChannel: - log.Ctx(ctx).Infof("👍 RPC service is healthy") - accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) - if err != nil { - return fmt.Errorf("getting ledger sequence for distribution account public key=%s: %w", distributionAccountPublicKey, err) - } - tx, err := txnbuild.NewTransaction( - txnbuild.TransactionParams{ - SourceAccount: &txnbuild.SimpleAccount{ - AccountID: distributionAccountPublicKey, - Sequence: accountSeq, - }, - IncrementSequenceNum: true, - Operations: ops, - BaseFee: s.BaseFee, - Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)}, - }, - ) - if err != nil { - return fmt.Errorf("building transaction: %w", err) - } + healthCheckCtx, cancel := context.WithTimeout(ctx, rpcHealthCheckTimeout) + defer cancel() - // Sign the transaction for the distribution account - tx, err = s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey) - if err != nil { - return fmt.Errorf("signing transaction for distribution account: %w", err) - } - // Sign the transaction for the channel accounts - tx, err = chAccSigner(ctx, tx) - if err != nil { - return fmt.Errorf("signing transaction with channel account(s) keypairs: %w", err) - } + ticker := time.NewTicker(sleepDelayForChannelAccountCreation) + defer ticker.Stop() - txHash, err := tx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase()) - if err != nil { - return fmt.Errorf("getting transaction hash: %w", err) - } - txXDR, err := tx.Base64() - if err != nil { - return fmt.Errorf("getting transaction envelope: %w", err) + // Try immediately first + _, err := s.RPCService.GetHealth() + if err != nil { + log.Ctx(ctx).Debugf("Initial RPC health check failed: %v, will retry...", err) + for { + select { + case <-healthCheckCtx.Done(): + return fmt.Errorf("timeout waiting for RPC service to become healthy: %w", healthCheckCtx.Err()) + case <-ticker.C: + _, err = s.RPCService.GetHealth() + if err == nil { + break + } + log.Ctx(ctx).Debugf("RPC health check failed: %v, will retry...", err) + continue + } + break } + } - log.Ctx(ctx).Infof("🚧 Submitting channel account transaction to RPC service") - err = s.submitTransaction(ctx, txHash, txXDR) - if err != nil { - return fmt.Errorf("submitting channel account transaction to RPC service: %w", err) - } - log.Ctx(ctx).Infof("🚧 Successfully submitted channel account transaction to RPC service, waiting for confirmation...") - err = s.waitForTransactionConfirmation(ctx, txHash) - if err != nil { - return fmt.Errorf("getting transaction status: %w", err) - } + log.Ctx(ctx).Infof("👍 RPC service is healthy") + accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) + if err != nil { + return fmt.Errorf("getting ledger sequence for distribution account public key=%s: %w", distributionAccountPublicKey, err) + } - return nil + tx, err := txnbuild.NewTransaction( + txnbuild.TransactionParams{ + SourceAccount: &txnbuild.SimpleAccount{ + AccountID: distributionAccountPublicKey, + Sequence: accountSeq, + }, + IncrementSequenceNum: true, + Operations: ops, + BaseFee: s.BaseFee, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)}, + }, + ) + if err != nil { + return fmt.Errorf("building transaction: %w", err) + } + + // Sign the transaction for the distribution account + tx, err = s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey) + if err != nil { + return fmt.Errorf("signing transaction for distribution account: %w", err) + } + // Sign the transaction for the channel accounts + tx, err = chAccSigner(ctx, tx) + if err != nil { + return fmt.Errorf("signing transaction with channel account(s) keypairs: %w", err) + } + + txHash, err := tx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase()) + if err != nil { + return fmt.Errorf("getting transaction hash: %w", err) } + txXDR, err := tx.Base64() + if err != nil { + return fmt.Errorf("getting transaction envelope: %w", err) + } + + log.Ctx(ctx).Infof("🚧 Submitting channel account transaction to RPC service") + err = s.submitTransaction(ctx, txHash, txXDR) + if err != nil { + return fmt.Errorf("submitting channel account transaction to RPC service: %w", err) + } + log.Ctx(ctx).Infof("🚧 Successfully submitted channel account transaction to RPC service, waiting for confirmation...") + err = s.waitForTransactionConfirmation(ctx, txHash) + if err != nil { + return fmt.Errorf("getting transaction status: %w", err) + } + + return nil } func (s *channelAccountService) submitTransaction(_ context.Context, hash string, signedTxXDR string) error { @@ -399,18 +420,12 @@ func (o *ChannelAccountServiceOptions) Validate() error { return nil } -func NewChannelAccountService(ctx context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error) { +func NewChannelAccountService(_ context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error) { err := opts.Validate() if err != nil { return nil, fmt.Errorf("validating channel account service options: %w", err) } - go func() { - if err := opts.RPCService.TrackRPCServiceHealth(ctx, nil); err != nil { - log.Ctx(ctx).Warnf("RPC health tracking stopped: %v", err) - } - }() - return &channelAccountService{ DB: opts.DB, RPCService: opts.RPCService, diff --git a/internal/services/channel_account_service_test.go b/internal/services/channel_account_service_test.go index bc0a3fc0..ade45932 100644 --- a/internal/services/channel_account_service_test.go +++ b/internal/services/channel_account_service_test.go @@ -2,8 +2,8 @@ package services import ( "context" + "fmt" "testing" - "time" "github.com/stellar/go/keypair" "github.com/stellar/go/network" @@ -107,11 +107,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { Return(network.TestNetworkPassphrase). Once() - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"} mockRPCService. - On("GetHeartbeatChannel"). - Return(heartbeatChan). + On("GetHealth"). + Return(entities.RPCGetHealthResult{Status: "healthy"}, nil). Once(). On("GetAccountLedgerSequence", distributionAccount.Address()). Return(int64(123), nil). @@ -195,10 +193,8 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { Return(&signedTx, nil). Once() - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"} mockRPCService. - On("GetHeartbeatChannel").Return(heartbeatChan).Once(). + On("GetHealth").Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).Once(). On("GetAccountLedgerSequence", distributionAccount.Address()).Return(int64(123), nil).Once(). On("GetAccountLedgerSequence", chAcc1.PublicKey).Return(int64(123), nil).Once(). On("GetAccountLedgerSequence", chAcc2.PublicKey).Return(int64(123), nil).Once(). @@ -256,11 +252,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { Return(network.TestNetworkPassphrase). Once() - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"} mockRPCService. - On("GetHeartbeatChannel"). - Return(heartbeatChan). + On("GetHealth"). + Return(entities.RPCGetHealthResult{Status: "healthy"}, nil). Once(). On("GetAccountLedgerSequence", distributionAccount.Address()). Return(int64(123), nil). @@ -312,11 +306,9 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { Return(network.TestNetworkPassphrase). Once() - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"} mockRPCService. - On("GetHeartbeatChannel"). - Return(heartbeatChan). + On("GetHealth"). + Return(entities.RPCGetHealthResult{Status: "healthy"}, nil). Once(). On("GetAccountLedgerSequence", distributionAccount.Address()). Return(int64(123), nil). @@ -349,10 +341,12 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { Return(distributionAccount.Address(), nil). Once() - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) + mockRPCService. + On("GetHealth"). + Return(entities.RPCGetHealthResult{}, fmt.Errorf("RPC unavailable")). + Maybe() }, - expectedError: "context cancelled while waiting for rpc service to become healthy", + expectedError: "timeout waiting for RPC service to become healthy", }, } @@ -361,7 +355,6 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { // Create fresh mocks for each test ctx := tc.getCtx() mockRPCService := NewRPCServiceMock(t) - mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return(nil) distAccSigClient := signing.NewSignatureClientMock(t) chAccSigClient := signing.NewSignatureClientMock(t) channelAccountStore := store.NewChannelAccountStoreMock(t) @@ -380,7 +373,6 @@ func Test_ChannelAccountService_EnsureChannelAccounts(t *testing.T) { PrivateKeyEncrypter: &signingutils.DefaultPrivateKeyEncrypter{}, EncryptionPassphrase: "my-encryption-passphrase", }) - time.Sleep(50 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth` require.NoError(t, err) // Execute test @@ -407,7 +399,6 @@ func TestSubmitTransaction(t *testing.T) { ctx := context.Background() mockRPCService := RPCServiceMock{} - mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return() defer mockRPCService.AssertExpectations(t) signatureClient := signing.SignatureClientMock{} channelAccountStore := store.ChannelAccountStoreMock{} @@ -422,9 +413,7 @@ func TestSubmitTransaction(t *testing.T) { PrivateKeyEncrypter: &privateKeyEncrypter, EncryptionPassphrase: passphrase, }) - time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth` require.NoError(t, err) - time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth` hash := "test_hash" signedTxXDR := "test_xdr" @@ -465,7 +454,6 @@ func TestWaitForTransactionConfirmation(t *testing.T) { ctx := context.Background() mockRPCService := RPCServiceMock{} defer mockRPCService.AssertExpectations(t) - mockRPCService.On("TrackRPCServiceHealth", ctx, mock.Anything).Return() signatureClient := signing.SignatureClientMock{} channelAccountStore := store.ChannelAccountStoreMock{} privateKeyEncrypter := signingutils.DefaultPrivateKeyEncrypter{} @@ -480,7 +468,6 @@ func TestWaitForTransactionConfirmation(t *testing.T) { EncryptionPassphrase: passphrase, }) require.NoError(t, err) - time.Sleep(100 * time.Millisecond) // waiting for the goroutine to call `TrackRPCServiceHealth` hash := "test_hash" From f48f6c07a4e6fb5771a1a5d721d5b0ff55b79e15 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 18 Nov 2025 17:09:57 -0500 Subject: [PATCH 3/3] Update rpc_service_test.go --- internal/services/rpc_service_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 7b8f3c5b..97d1a6f7 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -1016,8 +1016,9 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { } func TestTrackRPCService_ContextCancelled(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() + // Create and immediately cancel context to test cancellation handling + ctx, cancel := context.WithCancel(context.Background()) + cancel() dbt := dbtest.Open(t) defer dbt.Close() @@ -1031,15 +1032,26 @@ func TestTrackRPCService_ContextCancelled(t *testing.T) { rpcService, err := NewRPCService(rpcURL, network.TestNetworkPassphrase, mockHTTPClient, mockMetricsService) require.NoError(t, err) + // Mock metrics for the initial health check that happens before context check + mockMetricsService.On("IncRPCMethodCalls", "GetHealth").Maybe() + mockMetricsService.On("ObserveRPCMethodDuration", "GetHealth", mock.AnythingOfType("float64")).Maybe() + mockMetricsService.On("IncRPCRequests", "getHealth").Maybe() + mockMetricsService.On("IncRPCEndpointFailure", "getHealth").Maybe() + mockMetricsService.On("IncRPCMethodErrors", "GetHealth", "rpc_error").Maybe() + mockMetricsService.On("ObserveRPCRequestDuration", "getHealth", mock.AnythingOfType("float64")).Maybe() + mockMetricsService.On("SetRPCServiceHealth", false).Maybe() + + // Mock HTTP client to return error (simulating cancelled context) + mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything). + Return(&http.Response{}, context.Canceled).Maybe() + err = rpcService.TrackRPCServiceHealth(ctx, nil) require.Error(t, err) + assert.Contains(t, err.Error(), "context") // Verify channel is closed after context cancellation - time.Sleep(100 * time.Millisecond) _, ok := <-rpcService.GetHeartbeatChannel() assert.False(t, ok, "channel should be closed") - - mockHTTPClient.AssertNotCalled(t, "Post") } func TestTrackRPCService_DeadlockPrevention(t *testing.T) {