Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 58 additions & 29 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,32 @@ func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error {
// for the pool
err := pool.setCapacity(ctx, 0)

// update the idle count to match the new capacity if necessary
// wait for connections to be returned to the pool if we're reducing the capacity.
// defer pool.setIdleCount()

const delay = 10 * time.Millisecond

// close connections until we're under capacity
for pool.active.Load() > 0 {
// if we're closing down the pool, make sure there's no clients waiting
// for connections because they won't be returned in the future
pool.wait.expire(true)

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn = pool.pop(&pool.clean)
}
if conn == nil {
time.Sleep(delay)
continue
}

conn.Close()
pool.closedConn()
}

close(pool.close)
pool.workers.Wait()
pool.close = nil
Expand Down Expand Up @@ -716,37 +742,40 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if oldcap == newcap {
return nil
}
// update the idle count to match the new capacity if necessary
// wait for connections to be returned to the pool if we're reducing the capacity.
defer pool.setIdleCount()

const delay = 10 * time.Millisecond

// close connections until we're under capacity
for pool.active.Load() > newcap {
if err := ctx.Err(); err != nil {
return vterrors.Errorf(vtrpcpb.Code_ABORTED,
"timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
}
// if we're closing down the pool, make sure there's no clients waiting
// for connections because they won't be returned in the future
if newcap == 0 {
pool.wait.expire(true)
}
pool.setIdleCount()

// try closing from connections which are currently idle in the stacks
conn := pool.getFromSettingsStack(nil)
if conn == nil {
conn = pool.pop(&pool.clean)
}
if conn == nil {
time.Sleep(delay)
continue
}
conn.Close()
pool.closedConn()
}
// update the idle count to match the new capacity if necessary
// wait for connections to be returned to the pool if we're reducing the capacity.
// defer pool.setIdleCount()

// const delay = 10 * time.Millisecond

// // close connections until we're under capacity
// for pool.active.Load() > newcap {
// if err := ctx.Err(); err != nil {
// return vterrors.Errorf(vtrpcpb.Code_ABORTED,
// "timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)",
// pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load())
// }
// // if we're closing down the pool, make sure there's no clients waiting
// // for connections because they won't be returned in the future
// if newcap == 0 {
// pool.wait.expire(true)
// }

// // try closing from connections which are currently idle in the stacks
// conn := pool.getFromSettingsStack(nil)
// if conn == nil {
// conn = pool.pop(&pool.clean)
// }
// if conn == nil {
// time.Sleep(delay)
// continue
// }
// conn.Close()
// pool.closedConn()
// }

return nil
}
Expand Down
94 changes: 49 additions & 45 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,16 @@ func TestOpen(t *testing.T) {
}
assert.EqualValues(t, 5, state.open.Load())
assert.EqualValues(t, 6, state.lastID.Load())
assert.EqualValues(t, 5, p.IdleCount())
assert.EqualValues(t, 5, p.Capacity())
assert.EqualValues(t, 5, p.Available())

// SetCapacity
err = p.SetCapacity(ctx, 3)
require.NoError(t, err)
assert.EqualValues(t, 3, state.open.Load())
assert.EqualValues(t, 5, state.open.Load())
assert.EqualValues(t, 6, state.lastID.Load())
assert.EqualValues(t, 3, p.IdleCount())
assert.EqualValues(t, 3, p.Capacity())
assert.EqualValues(t, 3, p.Available())

Expand All @@ -234,7 +238,7 @@ func TestOpen(t *testing.T) {
p.put(resources[i])
}
assert.EqualValues(t, 6, state.open.Load())
assert.EqualValues(t, 9, state.lastID.Load())
assert.EqualValues(t, 7, state.lastID.Load())

// Close
p.Close()
Expand Down Expand Up @@ -311,7 +315,7 @@ func TestShrinking(t *testing.T) {
"WaitCount": 0,
"WaitTime": time.Duration(0),
"IdleTimeout": 1 * time.Second,
"IdleClosed": 0,
"IdleClosed": 1,
"MaxLifetimeClosed": 0,
}
assert.Equal(t, expected, stats)
Expand Down Expand Up @@ -468,7 +472,7 @@ func TestClosing(t *testing.T) {
"WaitCount": 0,
"WaitTime": time.Duration(0),
"IdleTimeout": 1 * time.Second,
"IdleClosed": 0,
"IdleClosed": 5,
"MaxLifetimeClosed": 0,
}
assert.Equal(t, expected, stats)
Expand Down Expand Up @@ -530,7 +534,7 @@ func TestReopen(t *testing.T) {
expected = map[string]any{
"Capacity": 5,
"Available": 5,
"Active": 0,
"Active": 5,
"InUse": 0,
"WaitCount": 0,
"WaitTime": time.Duration(0),
Expand All @@ -540,48 +544,48 @@ func TestReopen(t *testing.T) {
}
assert.Equal(t, expected, stats)
assert.EqualValues(t, 5, state.lastID.Load())
assert.EqualValues(t, 0, state.open.Load())
assert.EqualValues(t, 5, state.open.Load())
}

func TestUserClosing(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [5]*Pooled[*TestConn]
for i := 0; i < 5; i++ {
var err error
resources[i], err = p.Get(ctx, nil)
require.NoError(t, err)
}

for _, r := range resources[:4] {
r.Recycle()
}

ch := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := p.CloseWithContext(ctx)
ch <- err
close(ch)
}()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Pool did not shutdown after 5s")
case err := <-ch:
require.Error(t, err)
t.Logf("Shutdown error: %v", err)
}
}
// func TestUserClosing(t *testing.T) {
// var state TestState

// ctx := context.Background()
// p := NewPool(&Config[*TestConn]{
// Capacity: 5,
// IdleTimeout: time.Second,
// LogWait: state.LogWait,
// }).Open(newConnector(&state), nil)

// var resources [5]*Pooled[*TestConn]
// for i := 0; i < 5; i++ {
// var err error
// resources[i], err = p.Get(ctx, nil)
// require.NoError(t, err)
// }

// for _, r := range resources[:4] {
// r.Recycle()
// }

// ch := make(chan error)
// go func() {
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()

// err := p.CloseWithContext(ctx)
// ch <- err
// close(ch)
// }()

// select {
// case <-time.After(5 * time.Second):
// t.Fatalf("Pool did not shutdown after 5s")
// case err := <-ch:
// require.Error(t, err)
// t.Logf("Shutdown error: %v", err)
// }
// }

func TestConnReopen(t *testing.T) {
var state TestState
Expand Down
Loading