From bac9f138dacefd3d84da85babc02d22419a34317 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Sun, 17 Nov 2024 23:43:41 +0200 Subject: [PATCH 01/13] feat: Implement connection manager --- clients/tfchain-client-go/impl.go | 518 ++++++++++++++++++++----- clients/tfchain-client-go/impl_test.go | 193 +++++++++ clients/tfchain-client-go/transfer.go | 3 - 3 files changed, 604 insertions(+), 110 deletions(-) create mode 100644 clients/tfchain-client-go/impl_test.go diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index b6911a932..1031e76ca 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -1,9 +1,12 @@ package substrate import ( + "context" "fmt" "math/rand" + "slices" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff" @@ -13,20 +16,16 @@ import ( "github.com/rs/zerolog/log" ) -const ( - // acceptable delay is amount of blocks (in second) that a node can - // be behind before we don't accept it. block time is 6 seconds, so - // right now we only allow 2 blocks delay - acceptableDelay = 2 * 6 * time.Second +var ( + ErrInvalidVersion = fmt.Errorf("invalid version") + ErrUnknownVersion = fmt.Errorf("unknown version") + ErrNotFound = fmt.Errorf("object not found") + ErrNoConnectionsAvailable = fmt.Errorf("no healthy connections available") + ErrMaxPoolSizeReached = fmt.Errorf("max pool size reached") ) -var ( - //ErrInvalidVersion is returned if version 4bytes is invalid - ErrInvalidVersion = fmt.Errorf("invalid version") - //ErrUnknownVersion is returned if version number is not supported - ErrUnknownVersion = fmt.Errorf("unknown version") - //ErrNotFound is returned if an object is not found - ErrNotFound = fmt.Errorf("object not found") +const ( + AcceptableDelay = 2 * 6 * time.Second ) // Versioned base for all types @@ -37,145 +36,445 @@ type Versioned struct { type Conn = *gsrpc.SubstrateAPI type Meta = *types.Metadata +// Pool connection +type poolConn struct { + conn Conn + meta Meta + url string + lastUsed atomic.Int64 // Unix timestamp + inUse atomic.Bool +} + +func (pc *poolConn) isHealthy() bool { + if pc == nil || pc.conn == nil || pc.meta == nil { + return false + } + _, err := getTime(pc.conn, pc.meta) + return err == nil +} + +func (pc *poolConn) close() { + if pc != nil && pc.conn != nil { + pc.conn.Client.Close() + pc.conn = nil + pc.meta = nil + log.Debug().Str("url", pc.url).Msg("closed connection") + } +} + type Manager interface { + GetConnection(ctx context.Context) (*Substrate, error) + Close() error + + // Deprecated methods Raw() (Conn, Meta, error) Substrate() (*Substrate, error) } -type mgrImpl struct { - urls []string +type ManagerConfig struct { + // Maximum number of connections in the pool + MaxPoolSize int + // Minimum number of connections to maintain + MinPoolSize int + // Maximum time a connection can be idle before being closed (if the pool has more than MinPoolSize) + MaxIdleTime time.Duration + // Interval between health checks + // After thinking about it, we don't need to periodically check the health of the connections + // because this creates a lot of overhead + // so instead we just check the health when we need to and do the maintanance in demand + // HealthCheckInterval time.Duration + // Timeout for creating new connections + ConnectionTimeout time.Duration +} + +// Default configuration +var DefaultConfig = ManagerConfig{ + MaxPoolSize: 5, + MinPoolSize: 2, + MaxIdleTime: 30 * time.Minute, + // HealthCheckInterval: 120 * time.Second, + ConnectionTimeout: 10 * time.Second, +} + +type manager struct { + urls []string + pool []*poolConn + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + config ManagerConfig + checkChan chan struct{} +} - r int - m sync.Mutex +func NewManager(urls ...string) Manager { + return NewManagerWithConfig(DefaultConfig, urls...) } -func NewManager(url ...string) Manager { - if len(url) == 0 { - panic("at least one url is required") +func NewManagerWithConfig(config ManagerConfig, urls ...string) Manager { + if len(urls) == 0 { + panic("at least one URL required") } - // the shuffle is needed so if one endpoints fails, and the next one - // is tried, we will end up moving all connections to the "next" endpoint - // which will get overloaded. Instead the shuffle helps to make the "next" - // different for reach instace of the pool. - rand.Shuffle(len(url), func(i, j int) { - url[i], url[j] = url[j], url[i] - }) + // Validate and adjust configuration + if config.MaxPoolSize < 1 { + config.MaxPoolSize = DefaultConfig.MaxPoolSize + } + if config.MinPoolSize < 1 || config.MinPoolSize > config.MaxPoolSize { + config.MinPoolSize = min(DefaultConfig.MinPoolSize, config.MaxPoolSize) + } + if config.MaxIdleTime <= 0 { + config.MaxIdleTime = DefaultConfig.MaxIdleTime + } + // if config.HealthCheckInterval <= 0 { + // config.HealthCheckInterval = DefaultConfig.HealthCheckInterval + // } + if config.ConnectionTimeout <= 0 { + config.ConnectionTimeout = DefaultConfig.ConnectionTimeout + } + + ctx, cancel := context.WithCancel(context.Background()) + m := &manager{ + urls: shuffle(urls), + pool: make([]*poolConn, 0, config.MaxPoolSize), + ctx: ctx, + cancel: cancel, + config: config, + checkChan: make(chan struct{}, 1), + } + + m.initializePool() + m.wg.Add(1) + go m.healthChecker() + + return m +} - return &mgrImpl{ - urls: url, - r: rand.Intn(len(url)), // start with random url, then roundrobin +func (m *manager) initializePool() { + log.Debug().Msg("initializing connection pool") + for i := 0; i < m.config.MinPoolSize; i++ { + select { + case m.checkChan <- struct{}{}: + default: + } } } -// endpoint return the next endpoint to use -// in roundrobin fashion. need to be called -// while lock is acquired. -func (p *mgrImpl) endpoint() string { - defer func() { - p.r = (p.r + 1) % len(p.urls) - }() +func (m *manager) createConnection(ctx context.Context, url string) (*poolConn, error) { + log.Debug().Str("url", url).Msg("attempting to create a new connection") + ctx, cancel := context.WithTimeout(ctx, m.config.ConnectionTimeout) + defer cancel() + + select { + case <-ctx.Done(): + log.Error().Str("url", url).Msg("context done while creating connection") + return nil, ctx.Err() + default: + if conn, meta, err := createSubstrateConn(url); err == nil { + log.Debug().Str("url", url).Msg("created new connection") + return &poolConn{ + conn: conn, + meta: meta, + url: url, + lastUsed: atomic.Int64{}, + inUse: atomic.Bool{}, + }, nil + } else { + log.Error().Str("url", url).Err(err).Msg("failed to create connection") + } + } + return nil, fmt.Errorf("failed to create connection to %s", url) +} - return p.urls[p.r] +func (m *manager) GetConnection(ctx context.Context) (*Substrate, error) { + log.Debug().Msg("getting a connection") + conn, err := m.getHealthyConn() + if err != nil { + log.Error().Err(err).Msg("failed to get connection") + return nil, fmt.Errorf("failed to get connection: %w", err) + } + log.Debug().Str("url", conn.url).Msg("successfully obtained connection") + return newSubstrate(conn, m), nil } -// Substrate return a new wrapped substrate connection -// the connection must be closed after you are done using it -func (p *mgrImpl) Substrate() (*Substrate, error) { - cl, meta, err := p.Raw() +func (m *manager) getHealthyConn() (*poolConn, error) { + log.Debug().Int("pool_size", len(m.pool)).Int("aquired_count", m.aquiredConnCount()). + Msg("checking for healthy connections") + + // Try getting existing connection first + if conn := m.getExistingConn(); conn != nil { + return conn, nil + } + + b := backoff.NewExponentialBackOff() + b.MaxInterval = 2 * time.Second + b.InitialInterval = 500 * time.Millisecond + b.Multiplier = 2 + + var conn *poolConn + err := backoff.Retry(func() error { + // Check if we can get an existing connection + if c := m.getExistingConn(); c != nil { + conn = c + return nil + } + + m.mu.RLock() + poolSize := len(m.pool) + m.mu.RUnlock() + + if poolSize >= m.config.MaxPoolSize { + return backoff.Permanent(ErrMaxPoolSizeReached) + } + + select { + case m.checkChan <- struct{}{}: + log.Debug().Msg("triggered connection check") + default: + log.Debug().Msg("connection check already pending") + } + + // time.Sleep(50 * time.Millisecond) + return ErrNoConnectionsAvailable + }, b) + if err != nil { return nil, err } - return newSubstrate(cl, meta, p.put) -} - -// Raw returns a RPC substrate client. plus meta. The returned connection -// is not tracked by the pool, nor reusable. It's the caller responsibility -// to close the connection when done -func (p *mgrImpl) Raw() (Conn, Meta, error) { - // right now this pool implementation just tests the connection - // makes sure that it is still active, otherwise, tries again - // until the connection is restored. - // A better pool implementation can be done later were multiple connections - // can be handled - // TODO: thread safety! - p.m.Lock() - defer p.m.Unlock() - - boff := backoff.WithMaxRetries( - backoff.NewConstantBackOff(200*time.Millisecond), - 2*uint64(len(p.urls)), - ) - - var ( - cl *gsrpc.SubstrateAPI - meta *types.Metadata - err error - ) - - err = backoff.RetryNotify(func() error { - endpoint := p.endpoint() - log.Debug().Str("url", endpoint).Msg("connecting") - cl, err = newSubstrateAPI(endpoint) - if err != nil { - return errors.Wrapf(err, "error connecting to substrate at '%s'", endpoint) + return conn, nil +} + +func (m *manager) healthChecker() { + defer m.wg.Done() + // ticker := time.NewTicker(m.config.HealthCheckInterval) + // defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + // case <-ticker.C: + // m.checkConnections() + case <-m.checkChan: + m.checkConnections() } + } +} - meta, err = cl.RPC.State.GetMetadataLatest() - if err != nil { - return errors.Wrapf(err, "error getting latest metadata at '%s'", endpoint) +func (m *manager) checkConnections() { + m.mu.Lock() + healthy := make([]*poolConn, 0, len(m.pool)) + for _, conn := range m.pool { + if conn == nil { + continue } - t, err := getTime(cl, meta) - if err != nil { - return errors.Wrapf(err, "error getting node time at '%s'", endpoint) + if !conn.isHealthy() { + log.Debug().Str("url", conn.url).Msg("closing unhealthy connection") + conn.close() + continue } - if time.Since(t) > acceptableDelay { - return fmt.Errorf("node '%s' is behind acceptable delay with timestamp '%s'", endpoint, t) + // Check if connection is idle for too long if we have more than min pool size + if !conn.inUse.Load() && time.Since(time.Unix(conn.lastUsed.Load(), 0)) > m.config.MaxIdleTime && len(m.pool) > m.config.MinPoolSize { + log.Debug().Str("url", conn.url).Msg("closing idle connection") + conn.close() + continue } - return nil + healthy = append(healthy, conn) + } - }, boff, func(err error, _ time.Duration) { - log.Error().Err(err).Msg("failed to connect to endpoint, retrying") - }) + m.pool = healthy + m.mu.Unlock() + m.ensureMinConnections() - return cl, meta, err } -// TODO: implement reusable connections instead of -// closing the connection. -func (p *mgrImpl) put(cl *Substrate) { - // naive put implementation for now - // we just immediately kill the connection - if cl.cl != nil { - cl.cl.Client.Close() +func (m *manager) ensureMinConnections() { + log.Debug().Msg("ensuring minimum connections in the pool") + inUseCount := m.aquiredConnCount() + urls := shuffle(m.unusedURLs()) + urls = append(urls, m.urls...) + + for _, url := range urls { + poolSize := len(m.pool) + + if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { + if conn, err := m.createConnection(m.ctx, url); err == nil { + m.mu.Lock() + m.pool = append(m.pool, conn) + m.mu.Unlock() + log.Debug().Str("url", url).Msg("added new connection to pool") + } + } else { + break + } } - cl.cl = nil - cl.meta = nil } -// Substrate client -type Substrate struct { - cl Conn - meta Meta +func (m *manager) Close() error { + m.cancel() + m.wg.Wait() - close func(s *Substrate) + m.mu.Lock() + defer m.mu.Unlock() + + for _, conn := range m.pool { + conn.close() + } + m.pool = nil + return nil } -// NewSubstrate creates a substrate client -func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) { - return &Substrate{cl: cl, meta: meta, close: close}, nil +// Helper methods +func (m *manager) unusedURLs() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + // get all urls that are not in the pool + used := make([]string, 0, len(m.pool)) + for _, conn := range m.pool { + used = append(used, conn.url) + } + unused := make([]string, 0, len(m.urls)) + for _, url := range m.urls { + if !slices.Contains(used, url) { + unused = append(unused, url) + } + } + return unused } -func (s *Substrate) Close() { - s.close(s) +func (m *manager) aquiredConnCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + count := 0 + for _, conn := range m.pool { + if conn.inUse.Load() { + count++ + } + } + return count +} + +func (m *manager) getExistingConn() *poolConn { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, conn := range m.pool { + if conn.isHealthy() && !conn.inUse.Load() { + if conn.inUse.CompareAndSwap(false, true) { + conn.lastUsed.Store(time.Now().Unix()) + return conn + } + } + } + return nil +} + +func shuffle(urls []string) []string { + result := make([]string, len(urls)) + copy(result, urls) + rand.Shuffle(len(result), func(i, j int) { + result[i], result[j] = result[j], result[i] + }) + return result +} + +// Deprecated methods implementation +func (m *manager) Raw() (Conn, Meta, error) { + conn, err := m.GetConnection(context.Background()) + if err != nil { + return nil, nil, err + } + return conn.conn.conn, conn.conn.meta, nil +} + +func (m *manager) Substrate() (*Substrate, error) { + return m.GetConnection(context.Background()) +} + +type Substrate struct { + conn *poolConn + mgr *manager + mu sync.Mutex + closed bool +} + +func newSubstrate(conn *poolConn, mgr *manager) *Substrate { + return &Substrate{ + conn: conn, + mgr: mgr, + } +} + +func createSubstrateConn(url string) (Conn, Meta, error) { + cl, err := newSubstrateAPI(url) + if err != nil { + return nil, nil, err + } + + meta, err := cl.RPC.State.GetMetadataLatest() + if err != nil { + cl.Client.Close() + return nil, nil, err + } + + t, err := getTime(cl, meta) + if err != nil || time.Since(t) > AcceptableDelay { + cl.Client.Close() + return nil, nil, fmt.Errorf("node health check failed") + } + + return cl, meta, nil } func (s *Substrate) GetClient() (Conn, Meta, error) { - return s.cl, s.meta, nil + if s.closed { + log.Error().Msg("attempted to get client from closed substrate") + return nil, nil, fmt.Errorf("substrate connection closed") + } + + if s.conn.isHealthy() { + conn := s.conn.conn + meta := s.conn.meta + s.conn.lastUsed.Store(time.Now().Unix()) + return conn, meta, nil + } + s.conn.inUse.Store(false) + + conn, err := s.mgr.getHealthyConn() + if err != nil { + log.Error().Err(err).Msg("failed to get healthy connection for client") + return nil, nil, err + } + + s.mu.Lock() + + s.conn = conn + s.mu.Unlock() + + log.Debug().Str("url", conn.url).Msg("swapped connection") + return conn.conn, conn.meta, nil +} + +func (s *Substrate) Release() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed { + return + } + s.closed = true + + if s.conn != nil { + s.conn.inUse.Store(false) + log.Debug().Str("url", s.conn.url).Msg("releasing connection to pool") + s.conn = nil + } } func (s *Substrate) getVersion(b types.StorageDataRaw) (uint32, error) { @@ -196,6 +495,11 @@ func (s *Substrate) Time() (t time.Time, err error) { return getTime(cl, meta) } +// deprecated methods +func (s *Substrate) Close() { + s.Release() +} + func getTime(cl Conn, meta Meta) (t time.Time, err error) { key, err := types.CreateStorageKey(meta, "Timestamp", "Now", nil) if err != nil { diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go new file mode 100644 index 000000000..f798fb6d6 --- /dev/null +++ b/clients/tfchain-client-go/impl_test.go @@ -0,0 +1,193 @@ +package substrate + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPoolInitialization(t *testing.T) { + urls := []string{"ws://127.0.0.1:9944", "ws://127.0.0.1:9945", "ws://127.0.0.1:9946"} + mgr := NewManager(urls...) + defer mgr.Close() + + time.Sleep(100 * time.Millisecond) + + mgrImpl := mgr.(*manager) + mgrImpl.mu.RLock() + defer mgrImpl.mu.RUnlock() + + assert.LessOrEqual(t, len(mgrImpl.pool), mgrImpl.config.MinPoolSize) + assert.Greater(t, len(mgrImpl.pool), 0) +} + +func TestConnectionReuse(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944") + defer mgr.Close() + + // Wait for pool initialization + time.Sleep(100 * time.Millisecond) + + // Get first connection + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + + // Store connection details for comparison + conn1 := sub1.conn + url1 := sub1.conn.url + + // Release it back to pool properly + sub1.Release() + + // Small delay to ensure connection is properly released + time.Sleep(10 * time.Millisecond) + + // Get another connection + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + + // Should be the same underlying connection + assert.Equal(t, conn1, sub2.conn) + assert.Equal(t, url1, sub2.conn.url) +} + +func TestConcurrentAccess(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + sub, err := mgr.GetConnection(ctx) + if err != nil { + return + } + defer sub.Release() + sub.Time() + time.Sleep(10 * time.Millisecond) + }() + } + + wg.Wait() +} + +func TestFailover(t *testing.T) { + mgr := NewManager("ws://fail1", "ws://127.0.0.1:9944") + defer mgr.Close() + + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub1.Release() + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + assert.Equal(t, sub1.conn.url, "ws://127.0.0.1:9944") + assert.Equal(t, sub2.conn.url, "ws://127.0.0.1:9944") +} + +func TestHealthChecking(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944") + defer mgr.Close() + + sub, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub.Release() + + // Simulate connection failure + old := sub.conn.conn + old.Client.Close() + // simulate usage of the client + sub.Time() + assert.NotEqual(t, old, sub.conn.conn) +} + +func TestStressWithFailures(t *testing.T) { + if testing.Short() { + t.Skip("Skipping stress test in short mode") + } + + // Use test-specific configuration + config := ManagerConfig{ + MaxPoolSize: 30, + MinPoolSize: 3, + MaxIdleTime: time.Minute, + // HealthCheckInterval: time.Second, + ConnectionTimeout: time.Second, + } + + mgr := NewManagerWithConfig(config, "ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var ( + wg sync.WaitGroup + mu sync.Mutex + errors []error + ) + + for i := 0; i < 30; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + retryBackoff := time.Millisecond * 100 + maxBackoff := time.Second + + for ctx.Err() == nil { + sub, err := mgr.GetConnection(ctx) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("goroutine %d: %w", id, err)) + mu.Unlock() + + jitter := time.Duration(rand.Int63n(int64(retryBackoff))) + time.Sleep(retryBackoff + jitter) + retryBackoff *= 2 + if retryBackoff > maxBackoff { + retryBackoff = maxBackoff + } + continue + } + + // Reset backoff on success + retryBackoff = time.Millisecond * 100 + + // Simulate work + sub.Time() + time.Sleep(time.Duration(rand.Intn(250)+50) * time.Millisecond) + + if id%2 == 0 && rand.Float32() < 0.1 { + sub.conn.conn.Client.Close() + } + + sub.Release() + } + }(i) + } + + wg.Wait() + + // Log and check errors + for _, err := range errors { + t.Logf("Error: %v", err) + } + + assert.Less(t, len(errors), 10, + "Too many errors occurred during stress test: %d", len(errors)) +} diff --git a/clients/tfchain-client-go/transfer.go b/clients/tfchain-client-go/transfer.go index 5648a851e..c739a5fef 100644 --- a/clients/tfchain-client-go/transfer.go +++ b/clients/tfchain-client-go/transfer.go @@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou bal := big.NewInt(int64(amount)) c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal)) - if err != nil { - panic(err) - } if err != nil { return errors.Wrap(err, "failed to create call") From bb36e80a7ca2ca9396a14bb79873609fa78babf6 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 00:20:28 +0200 Subject: [PATCH 02/13] fix go linter --- clients/tfchain-client-go/impl_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go index f798fb6d6..43c5e4ff0 100644 --- a/clients/tfchain-client-go/impl_test.go +++ b/clients/tfchain-client-go/impl_test.go @@ -77,7 +77,8 @@ func TestConcurrentAccess(t *testing.T) { return } defer sub.Release() - sub.Time() + _, err = sub.Time() + assert.NoError(t, err) time.Sleep(10 * time.Millisecond) }() } @@ -111,7 +112,8 @@ func TestHealthChecking(t *testing.T) { old := sub.conn.conn old.Client.Close() // simulate usage of the client - sub.Time() + _, err = sub.Time() + assert.NoError(t, err) assert.NotEqual(t, old, sub.conn.conn) } @@ -169,7 +171,8 @@ func TestStressWithFailures(t *testing.T) { retryBackoff = time.Millisecond * 100 // Simulate work - sub.Time() + _, err = sub.Time() + assert.NoError(t, err) time.Sleep(time.Duration(rand.Intn(250)+50) * time.Millisecond) if id%2 == 0 && rand.Float32() < 0.1 { From 19211f41b243f9fe84955730bf92786c44d39cad Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 00:33:14 +0200 Subject: [PATCH 03/13] Adjust tests to use one url to work with substarte integration tests for now --- clients/tfchain-client-go/impl_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go index 43c5e4ff0..b3c158e14 100644 --- a/clients/tfchain-client-go/impl_test.go +++ b/clients/tfchain-client-go/impl_test.go @@ -13,7 +13,7 @@ import ( ) func TestPoolInitialization(t *testing.T) { - urls := []string{"ws://127.0.0.1:9944", "ws://127.0.0.1:9945", "ws://127.0.0.1:9946"} + urls := []string{"ws://127.0.0.1:9944"} mgr := NewManager(urls...) defer mgr.Close() @@ -59,7 +59,7 @@ func TestConnectionReuse(t *testing.T) { } func TestConcurrentAccess(t *testing.T) { - mgr := NewManager("ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + mgr := NewManager("ws://127.0.0.1:9944") defer mgr.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -131,7 +131,7 @@ func TestStressWithFailures(t *testing.T) { ConnectionTimeout: time.Second, } - mgr := NewManagerWithConfig(config, "ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + mgr := NewManagerWithConfig(config, "ws://127.0.0.1:9944") defer mgr.Close() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) From 2ffd8332b83c298d377097110129f3309be76ca4 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 00:45:22 +0200 Subject: [PATCH 04/13] Respect CI env like in other tests --- clients/tfchain-client-go/impl_test.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go index b3c158e14..cc751c581 100644 --- a/clients/tfchain-client-go/impl_test.go +++ b/clients/tfchain-client-go/impl_test.go @@ -13,7 +13,7 @@ import ( ) func TestPoolInitialization(t *testing.T) { - urls := []string{"ws://127.0.0.1:9944"} + urls := []string{getUrlBasedOnEnv()} mgr := NewManager(urls...) defer mgr.Close() @@ -28,7 +28,8 @@ func TestPoolInitialization(t *testing.T) { } func TestConnectionReuse(t *testing.T) { - mgr := NewManager("ws://127.0.0.1:9944") + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) defer mgr.Close() // Wait for pool initialization @@ -59,7 +60,8 @@ func TestConnectionReuse(t *testing.T) { } func TestConcurrentAccess(t *testing.T) { - mgr := NewManager("ws://127.0.0.1:9944") + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) defer mgr.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -87,7 +89,8 @@ func TestConcurrentAccess(t *testing.T) { } func TestFailover(t *testing.T) { - mgr := NewManager("ws://fail1", "ws://127.0.0.1:9944") + urls := []string{"ws://fail1", getUrlBasedOnEnv()} + mgr := NewManager(urls...) defer mgr.Close() sub1, err := mgr.GetConnection(context.Background()) @@ -96,12 +99,13 @@ func TestFailover(t *testing.T) { sub2, err := mgr.GetConnection(context.Background()) require.NoError(t, err) defer sub2.Release() - assert.Equal(t, sub1.conn.url, "ws://127.0.0.1:9944") - assert.Equal(t, sub2.conn.url, "ws://127.0.0.1:9944") + assert.Equal(t, sub1.conn.url, urls[1]) + assert.Equal(t, sub2.conn.url, urls[1]) } func TestHealthChecking(t *testing.T) { - mgr := NewManager("ws://127.0.0.1:9944") + urls := []string{getUrlBasedOnEnv()} + mgr := NewManager(urls...) defer mgr.Close() sub, err := mgr.GetConnection(context.Background()) @@ -131,7 +135,8 @@ func TestStressWithFailures(t *testing.T) { ConnectionTimeout: time.Second, } - mgr := NewManagerWithConfig(config, "ws://127.0.0.1:9944") + urls := []string{getUrlBasedOnEnv()} + mgr := NewManagerWithConfig(config, urls...) defer mgr.Close() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) From a851f2a9e1a246b64823679e576930d1940d2b40 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:03:39 +0200 Subject: [PATCH 05/13] set MaxElapsedTime for the backoff --- clients/tfchain-client-go/impl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index 1031e76ca..ab610e199 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -208,6 +208,7 @@ func (m *manager) getHealthyConn() (*poolConn, error) { b := backoff.NewExponentialBackOff() b.MaxInterval = 2 * time.Second + b.MaxElapsedTime = 4 * time.Second b.InitialInterval = 500 * time.Millisecond b.Multiplier = 2 From e4cc60220b27f86a80d48aef6f427164c5ce5eb6 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:16:05 +0200 Subject: [PATCH 06/13] update utils_test.go --- clients/tfchain-client-go/utils_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/clients/tfchain-client-go/utils_test.go b/clients/tfchain-client-go/utils_test.go index a42343f99..530f227f8 100644 --- a/clients/tfchain-client-go/utils_test.go +++ b/clients/tfchain-client-go/utils_test.go @@ -63,6 +63,14 @@ func startLocalConnection(t *testing.T) *Substrate { return cl } +func getUrlBasedOnEnv() string { + if _, ok := os.LookupEnv("CI"); ok { + return "ws://127.0.0.1:9944" + } else { + return "wss://tfchain.dev.grid.tf" + } +} + func assertCreateTwin(t *testing.T, cl *Substrate, user AccountUser) uint32 { u := Accounts[user] From 91f8e879e210b9b59fffd44c23f8b1128d962144 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:27:15 +0200 Subject: [PATCH 07/13] update go veriosn in lint workflow --- clients/tfchain-client-go/.github/workflows/lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/tfchain-client-go/.github/workflows/lint.yml b/clients/tfchain-client-go/.github/workflows/lint.yml index d8d8c7fff..81bcef5d4 100644 --- a/clients/tfchain-client-go/.github/workflows/lint.yml +++ b/clients/tfchain-client-go/.github/workflows/lint.yml @@ -16,7 +16,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: "1.19" + go-version: "1.21" id: go - name: Check out code into the Go module directory From db1111b5339eceec0560a9a2cbefdba81e227928 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:32:44 +0200 Subject: [PATCH 08/13] update go veriosn and golangci-lint-action in lint workflow --- .github/workflows/020_lint_and_test_go_client.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/020_lint_and_test_go_client.yaml b/.github/workflows/020_lint_and_test_go_client.yaml index 5694ef912..9998dc8d7 100644 --- a/.github/workflows/020_lint_and_test_go_client.yaml +++ b/.github/workflows/020_lint_and_test_go_client.yaml @@ -21,13 +21,13 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: "1.20" + go-version: "1.21" cache: false # cache-dependency-path: clients/tfchain-client-go/go.sum id: go - name: golangci-lint - uses: golangci/golangci-lint-action@v3.7.0 + uses: golangci/golangci-lint-action@v6 with: args: --timeout 3m --verbose working-directory: clients/tfchain-client-go From f949e1f5da471f882964ae70128399431c1419fe Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:48:29 +0200 Subject: [PATCH 09/13] update staticcheck-action in go lint workflow --- .github/workflows/020_lint_and_test_go_client.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/020_lint_and_test_go_client.yaml b/.github/workflows/020_lint_and_test_go_client.yaml index 9998dc8d7..d2a7243d1 100644 --- a/.github/workflows/020_lint_and_test_go_client.yaml +++ b/.github/workflows/020_lint_and_test_go_client.yaml @@ -33,9 +33,10 @@ jobs: working-directory: clients/tfchain-client-go - name: staticcheck - uses: dominikh/staticcheck-action@v1.3.0 + uses: dominikh/staticcheck-action@v1 with: - version: "2022.1.3" + version: "latest" + install-go: false working-directory: clients/tfchain-client-go env: GO111MODULE: on From b1aea28e0bab9fca8beffd7da9a42a22e5dec77f Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 01:53:57 +0200 Subject: [PATCH 10/13] remove obsolete client workflows --- .../.github/workflows/lint.yml | 35 ------------------- .../.github/workflows/test.yml | 24 ------------- 2 files changed, 59 deletions(-) delete mode 100644 clients/tfchain-client-go/.github/workflows/lint.yml delete mode 100644 clients/tfchain-client-go/.github/workflows/test.yml diff --git a/clients/tfchain-client-go/.github/workflows/lint.yml b/clients/tfchain-client-go/.github/workflows/lint.yml deleted file mode 100644 index 81bcef5d4..000000000 --- a/clients/tfchain-client-go/.github/workflows/lint.yml +++ /dev/null @@ -1,35 +0,0 @@ -name: Lint -on: - pull_request: - paths-ignore: - - "readme.md" - push: - paths-ignore: - - "readme.md" - -jobs: - lint: - name: lint - runs-on: ubuntu-latest - timeout-minutes: 5 - steps: - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: "1.21" - id: go - - - name: Check out code into the Go module directory - uses: actions/checkout@v3.2.0 - with: - submodules: "true" - - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - args: --timeout 3m --verbose - - - name: gofmt - uses: Jerome1337/gofmt-action@v1.0.5 - with: - gofmt-flags: "-l -d" \ No newline at end of file diff --git a/clients/tfchain-client-go/.github/workflows/test.yml b/clients/tfchain-client-go/.github/workflows/test.yml deleted file mode 100644 index 951e7a782..000000000 --- a/clients/tfchain-client-go/.github/workflows/test.yml +++ /dev/null @@ -1,24 +0,0 @@ -name: test-substrate-client - -on: - push: - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: install go - uses: actions/setup-go@v3 - with: - go-version: 1.18 - - - name: run docker image - run: docker run -d -p 9944:9944 threefolddev/tfchain:2.3.0-rc2 --dev --ws-external - - - name: wait for node initialization - run: sleep 3 - - - name: run test - run: go test -v ./... From a812cd878c6e56d809b30b14786182282cec0459 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 02:57:05 +0200 Subject: [PATCH 11/13] Update go version in build_test docker image --- .github/workflows/build_test.Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_test.Dockerfile b/.github/workflows/build_test.Dockerfile index e8b37454a..43fdb2e54 100644 --- a/.github/workflows/build_test.Dockerfile +++ b/.github/workflows/build_test.Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 ENV DEBIAN_FRONTEND=noninteractive RUN apt update && \ apt install -y \ @@ -16,8 +16,8 @@ RUN apt update && \ zstd \ wget \ protobuf-compiler && \ - wget https://go.dev/dl/go1.20.2.linux-amd64.tar.gz && \ - tar -xvf go1.20.2.linux-amd64.tar.gz && \ + wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz && \ + tar -xvf go1.21.13.linux-amd64.tar.gz && \ mv go /usr/local && \ echo "GOPATH=/usr/local/go" >> ~/.bashrc && \ echo "PATH=\$PATH:\$GOPATH/bin" >> ~/.bashrc && \ From 3b6ec40760d91f22f3cdc784605325ab6f10e297 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 03:13:37 +0200 Subject: [PATCH 12/13] bump build_test image version to 5 --- .github/workflows/010_build_and_test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/010_build_and_test.yaml b/.github/workflows/010_build_and_test.yaml index 7f150a3df..2c96f8af1 100644 --- a/.github/workflows/010_build_and_test.yaml +++ b/.github/workflows/010_build_and_test.yaml @@ -8,7 +8,7 @@ jobs: build-and-test: runs-on: [self-hosted, tfchainrunner01] container: - image: threefolddev/tfchain:4 + image: threefolddev/tfchain:5 env: DEBIAN_FRONTEND: noninteractive PATH: /root/.cargo/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/go/bin From 2cfa156bca76738d7d05bd5eae0962b3d4d2fd24 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Mon, 18 Nov 2024 04:20:21 +0200 Subject: [PATCH 13/13] use atomic.Bool for closed bool --- clients/tfchain-client-go/impl.go | 65 +++++++++++++------------------ 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index ab610e199..369f5cee5 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -160,28 +160,19 @@ func (m *manager) initializePool() { } } -func (m *manager) createConnection(ctx context.Context, url string) (*poolConn, error) { +func (m *manager) createConnection(url string) (*poolConn, error) { log.Debug().Str("url", url).Msg("attempting to create a new connection") - ctx, cancel := context.WithTimeout(ctx, m.config.ConnectionTimeout) - defer cancel() - - select { - case <-ctx.Done(): - log.Error().Str("url", url).Msg("context done while creating connection") - return nil, ctx.Err() - default: - if conn, meta, err := createSubstrateConn(url); err == nil { - log.Debug().Str("url", url).Msg("created new connection") - return &poolConn{ - conn: conn, - meta: meta, - url: url, - lastUsed: atomic.Int64{}, - inUse: atomic.Bool{}, - }, nil - } else { - log.Error().Str("url", url).Err(err).Msg("failed to create connection") - } + if conn, meta, err := createSubstrateConn(url); err == nil { + log.Debug().Str("url", url).Msg("created new connection") + return &poolConn{ + conn: conn, + meta: meta, + url: url, + lastUsed: atomic.Int64{}, + inUse: atomic.Bool{}, + }, nil + } else { + log.Error().Str("url", url).Err(err).Msg("failed to create connection") } return nil, fmt.Errorf("failed to create connection to %s", url) } @@ -239,11 +230,7 @@ func (m *manager) getHealthyConn() (*poolConn, error) { return ErrNoConnectionsAvailable }, b) - if err != nil { - return nil, err - } - - return conn, nil + return conn, err } func (m *manager) healthChecker() { @@ -303,7 +290,7 @@ func (m *manager) ensureMinConnections() { poolSize := len(m.pool) if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { - if conn, err := m.createConnection(m.ctx, url); err == nil { + if conn, err := m.createConnection(url); err == nil { m.mu.Lock() m.pool = append(m.pool, conn) m.mu.Unlock() @@ -402,7 +389,7 @@ type Substrate struct { conn *poolConn mgr *manager mu sync.Mutex - closed bool + closed atomic.Bool } func newSubstrate(conn *poolConn, mgr *manager) *Substrate { @@ -434,18 +421,23 @@ func createSubstrateConn(url string) (Conn, Meta, error) { } func (s *Substrate) GetClient() (Conn, Meta, error) { - if s.closed { + if s.closed.Load() { log.Error().Msg("attempted to get client from closed substrate") return nil, nil, fmt.Errorf("substrate connection closed") } - if s.conn.isHealthy() { + s.mu.Lock() + if s.conn != nil && s.conn.isHealthy() { conn := s.conn.conn meta := s.conn.meta s.conn.lastUsed.Store(time.Now().Unix()) + s.mu.Unlock() return conn, meta, nil } - s.conn.inUse.Store(false) + if s.conn != nil { + s.conn.inUse.Store(false) + } + s.mu.Unlock() conn, err := s.mgr.getHealthyConn() if err != nil { @@ -454,22 +446,21 @@ func (s *Substrate) GetClient() (Conn, Meta, error) { } s.mu.Lock() + defer s.mu.Unlock() s.conn = conn - s.mu.Unlock() log.Debug().Str("url", conn.url).Msg("swapped connection") return conn.conn, conn.meta, nil } func (s *Substrate) Release() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.closed { + if !s.closed.CompareAndSwap(false, true) { return } - s.closed = true + + s.mu.Lock() + defer s.mu.Unlock() if s.conn != nil { s.conn.inUse.Store(false)