diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 2275e96c972c..1a5775d9fa5b 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -23,7 +23,10 @@ import ( v3 "go.etcd.io/etcd/client/v3" ) -const defaultSessionTTL = 60 +const ( + defaultSessionTTL = 60 + defaultLeaseTimeout = 5 * time.Second +) // Session represents a lease kept alive for the lifetime of a client. // Fault-tolerant applications may use sessions to reason about liveness. @@ -47,7 +50,9 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) { id := ops.leaseID if id == v3.NoLease { - resp, err := client.Grant(ops.ctx, int64(ops.ttl)) + ctx, cancel := context.WithTimeout(ops.ctx, defaultLeaseTimeout) + defer cancel() + resp, err := client.Grant(ctx, int64(ops.ttl)) if err != nil { return nil, err } diff --git a/tests/integration/clientv3/concurrency/session_test.go b/tests/integration/clientv3/concurrency/session_test.go index 646044726a28..aae7dfca3837 100644 --- a/tests/integration/clientv3/concurrency/session_test.go +++ b/tests/integration/clientv3/concurrency/session_test.go @@ -95,3 +95,96 @@ func TestSessionCtx(t *testing.T) { } assert.Equal(t, childCtx.Err(), context.Canceled) } + +// TestNewSessionLeaseGrantTimeout tests that NewSession respects timeout when creating a lease +func TestNewSessionLeaseGrantTimeout(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + + // Test case 1: Very short timeout should fail + t.Run("ShortTimeoutShouldFail", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + + start := time.Now() + session, err := concurrency.NewSession(cli, concurrency.WithContext(ctx), concurrency.WithTTL(5)) + elapsed := time.Since(start) + + if err == nil { + if session != nil { + session.Close() + } + t.Fatal("expected timeout error, but got nil") + } + + if err != context.DeadlineExceeded { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } + + // Should fail quickly (within reasonable time) + if elapsed > 100*time.Millisecond { + t.Fatalf("timeout took too long: %v", elapsed) + } + }) + + // Test case 2: Adequate timeout should succeed + t.Run("AdequateTimeoutShouldSucceed", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + session, err := concurrency.NewSession(cli, concurrency.WithContext(ctx), concurrency.WithTTL(5)) + if err != nil { + t.Fatalf("expected success, got error: %v", err) + } + if session == nil { + t.Fatal("expected valid session, got nil") + } + defer session.Close() + + // Verify session has a valid lease + if session.Lease() == 0 { + t.Fatal("session should have a valid lease ID") + } + }) +} + +// TestNewSessionNormalOperationAfterFix tests that normal session creation still works +func TestNewSessionNormalOperationAfterFix(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cli := clus.RandClient() + + // Verify normal session operations work correctly after the timeout fix + session, err := concurrency.NewSession(cli, concurrency.WithTTL(60)) + if err != nil { + t.Fatalf("failed to create session: %v", err) + } + defer session.Close() + + // Session should be immediately usable + if session.Lease() == 0 { + t.Fatal("session should have a valid lease") + } + + // Keep-alive should be working + select { + case <-session.Done(): + t.Fatal("session should not be done immediately") + case <-time.After(100 * time.Millisecond): + // Expected - session should remain alive + } + + // Should be able to use session for mutex + mutex := concurrency.NewMutex(session, "test-mutex") + if mutex == nil { + t.Fatal("should be able to create mutex with session") + } +} +