From b1d45cce89a69dbf5704bf0f94f0b923725f7621 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 8 Feb 2024 14:55:11 +0800 Subject: [PATCH 1/6] *: new SharedBufferPool to replace the old Signed-off-by: Weizhen Wang --- internal/client/client.go | 2 +- internal/client/pool.go | 117 +++++++++++++++++++++++++++++++++++ internal/client/pool_test.go | 42 +++++++++++++ 3 files changed, 160 insertions(+), 1 deletion(-) create mode 100644 internal/client/pool.go create mode 100644 internal/client/pool_test.go diff --git a/internal/client/client.go b/internal/client/client.go index 0212081cb7..8dbb249247 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -297,7 +297,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint }), }, opts...) if cfg.TiKVClient.GrpcSharedBufferPool { - opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool())) + opts = append(opts, experimental.WithRecvBufferPool(NewSharedBufferPool())) } conn, err := a.monitoredDial( ctx, diff --git a/internal/client/pool.go b/internal/client/pool.go new file mode 100644 index 0000000000..6ad8f5591d --- /dev/null +++ b/internal/client/pool.go @@ -0,0 +1,117 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +func NewSharedBufferPool() SharedBufferPool { + return &simpleSharedBufferPool{ + pools: [poolArraySize]simpleSharedBufferChildPool{ + newBytesPool(level0PoolMaxSize), + newBytesPool(level1PoolMaxSize), + newBytesPool(level2PoolMaxSize), + newBytesPool(level3PoolMaxSize), + newBytesPool(level4PoolMaxSize), + newBytesPool(0), + }, + } +} + +// simpleSharedBufferPool is a simple implementation of SharedBufferPool. +type simpleSharedBufferPool struct { + pools [poolArraySize]simpleSharedBufferChildPool +} + +func (p *simpleSharedBufferPool) Get(size int) []byte { + return p.pools[p.poolIdx(size)].Get(size) +} + +func (p *simpleSharedBufferPool) Put(bs *[]byte) { + p.pools[p.poolIdx(cap(*bs))].Put(bs) +} + +func (p *simpleSharedBufferPool) poolIdx(size int) int { + switch { + case size <= level0PoolMaxSize: + return level0PoolIdx + case size <= level1PoolMaxSize: + return level1PoolIdx + case size <= level2PoolMaxSize: + return level2PoolIdx + case size <= level3PoolMaxSize: + return level3PoolIdx + case size <= level4PoolMaxSize: + return level4PoolIdx + default: + return levelMaxPoolIdx + } +} + +const ( + level0PoolMaxSize = 16 // 16 B + level1PoolMaxSize = level0PoolMaxSize * 16 // 256 B + level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB + level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB + level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB +) + +const ( + level0PoolIdx = iota + level1PoolIdx + level2PoolIdx + level3PoolIdx + level4PoolIdx + levelMaxPoolIdx + poolArraySize +) + +type simpleSharedBufferChildPool interface { + Get(size int) []byte + Put(any) +} + +type bufferPool struct { + sync.Pool + + defaultSize int +} + +func (p *bufferPool) Get(size int) []byte { + bs := p.Pool.Get().(*[]byte) + + if cap(*bs) < size { + p.Pool.Put(bs) + + return make([]byte, size) + } + + return (*bs)[:size] +} + +func (p *bufferPool) Put(i any) { + runtime.SetFinalizer(bs, func() { + bs := bs[:0] + p.Pool.Put(bs) + }) +} +func newBytesPool(size int) simpleSharedBufferChildPool { + return &bufferPool{ + Pool: sync.Pool{ + New: func() any { + bs := make([]byte, size) + return &bs + }, + }, + defaultSize: size, + } +} \ No newline at end of file diff --git a/internal/client/pool_test.go b/internal/client/pool_test.go new file mode 100644 index 0000000000..c53d8807d0 --- /dev/null +++ b/internal/client/pool_test.go @@ -0,0 +1,42 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +func (s) TestSharedBufferPool(t *testing.T) { + pools := []SharedBufferPool{ + nopBufferPool{}, + NewSharedBufferPool(), + } + + lengths := []int{ + level4PoolMaxSize + 1, + level4PoolMaxSize, + level3PoolMaxSize, + level2PoolMaxSize, + level1PoolMaxSize, + level0PoolMaxSize, + } + + for _, p := range pools { + for _, l := range lengths { + bs := p.Get(l) + if len(bs) != l { + t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) + } + + p.Put(&bs) + } + } +} \ No newline at end of file From e34ea0f91b82c8fe9d0466830a8512e729a0f116 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 8 Feb 2024 15:39:25 +0800 Subject: [PATCH 2/6] test Signed-off-by: Weizhen Wang --- internal/client/pool.go | 28 ++++++++++++++++++---------- internal/client/pool_test.go | 25 ++++++++++--------------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/internal/client/pool.go b/internal/client/pool.go index 6ad8f5591d..4b6bc59d76 100644 --- a/internal/client/pool.go +++ b/internal/client/pool.go @@ -14,22 +14,29 @@ package client -func NewSharedBufferPool() SharedBufferPool { +import ( + "runtime" + "sync" + + "google.golang.org/grpc" +) + +func NewSharedBufferPool() grpc.SharedBufferPool { return &simpleSharedBufferPool{ - pools: [poolArraySize]simpleSharedBufferChildPool{ + pools: [poolArraySize]*bufferPool{ newBytesPool(level0PoolMaxSize), newBytesPool(level1PoolMaxSize), newBytesPool(level2PoolMaxSize), newBytesPool(level3PoolMaxSize), newBytesPool(level4PoolMaxSize), - newBytesPool(0), + newBytesPool(level4PoolMaxSize), }, } } // simpleSharedBufferPool is a simple implementation of SharedBufferPool. type simpleSharedBufferPool struct { - pools [poolArraySize]simpleSharedBufferChildPool + pools [poolArraySize]*bufferPool } func (p *simpleSharedBufferPool) Get(size int) []byte { @@ -98,13 +105,14 @@ func (p *bufferPool) Get(size int) []byte { return (*bs)[:size] } -func (p *bufferPool) Put(i any) { - runtime.SetFinalizer(bs, func() { - bs := bs[:0] - p.Pool.Put(bs) +func (p *bufferPool) Put(i *[]byte) { + (*i) = (*i)[:0] + runtime.SetFinalizer(i, func(i *[]byte) { + p.Pool.Put(i) }) } -func newBytesPool(size int) simpleSharedBufferChildPool { + +func newBytesPool(size int) *bufferPool { return &bufferPool{ Pool: sync.Pool{ New: func() any { @@ -114,4 +122,4 @@ func newBytesPool(size int) simpleSharedBufferChildPool { }, defaultSize: size, } -} \ No newline at end of file +} diff --git a/internal/client/pool_test.go b/internal/client/pool_test.go index c53d8807d0..566c751e47 100644 --- a/internal/client/pool_test.go +++ b/internal/client/pool_test.go @@ -14,12 +14,9 @@ package client -func (s) TestSharedBufferPool(t *testing.T) { - pools := []SharedBufferPool{ - nopBufferPool{}, - NewSharedBufferPool(), - } +import "testing" +func TestSharedBufferPool(t *testing.T) { lengths := []int{ level4PoolMaxSize + 1, level4PoolMaxSize, @@ -28,15 +25,13 @@ func (s) TestSharedBufferPool(t *testing.T) { level1PoolMaxSize, level0PoolMaxSize, } - - for _, p := range pools { - for _, l := range lengths { - bs := p.Get(l) - if len(bs) != l { - t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) - } - - p.Put(&bs) + p := NewSharedBufferPool() + for _, l := range lengths { + bs := p.Get(l) + if len(bs) != l { + t.Fatalf("Expected buffer of length %d, got %d", l, len(bs)) } + + p.Put(&bs) } -} \ No newline at end of file +} From c8028cd3d628093537d43b77de3b5521e697ba0d Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 8 Feb 2024 15:43:28 +0800 Subject: [PATCH 3/6] test Signed-off-by: Weizhen Wang --- internal/client/pool.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/client/pool.go b/internal/client/pool.go index 4b6bc59d76..b3093dd0a5 100644 --- a/internal/client/pool.go +++ b/internal/client/pool.go @@ -82,11 +82,6 @@ const ( poolArraySize ) -type simpleSharedBufferChildPool interface { - Get(size int) []byte - Put(any) -} - type bufferPool struct { sync.Pool From 73992e8f190f14bde13e15b2583a52581c80c814 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 8 Feb 2024 16:26:54 +0800 Subject: [PATCH 4/6] test Signed-off-by: Weizhen Wang --- internal/client/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client/pool.go b/internal/client/pool.go index b3093dd0a5..25ed617d26 100644 --- a/internal/client/pool.go +++ b/internal/client/pool.go @@ -101,8 +101,8 @@ func (p *bufferPool) Get(size int) []byte { } func (p *bufferPool) Put(i *[]byte) { - (*i) = (*i)[:0] runtime.SetFinalizer(i, func(i *[]byte) { + (*i) = (*i)[:0] p.Pool.Put(i) }) } From b7f5eaad5725719afa67b4fc0c706f82f8af47ef Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 8 Feb 2024 16:30:36 +0800 Subject: [PATCH 5/6] test Signed-off-by: Weizhen Wang --- internal/client/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client/pool.go b/internal/client/pool.go index 25ed617d26..06327759f8 100644 --- a/internal/client/pool.go +++ b/internal/client/pool.go @@ -102,7 +102,7 @@ func (p *bufferPool) Get(size int) []byte { func (p *bufferPool) Put(i *[]byte) { runtime.SetFinalizer(i, func(i *[]byte) { - (*i) = (*i)[:0] + clear(*i) p.Pool.Put(i) }) } From e9033904cc8ce7c14e04db7ab54e9aef457c82dd Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 13 Feb 2024 13:05:49 +0800 Subject: [PATCH 6/6] test Signed-off-by: Weizhen Wang --- internal/client/pool.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/client/pool.go b/internal/client/pool.go index 06327759f8..ce9e4564cc 100644 --- a/internal/client/pool.go +++ b/internal/client/pool.go @@ -40,11 +40,19 @@ type simpleSharedBufferPool struct { } func (p *simpleSharedBufferPool) Get(size int) []byte { - return p.pools[p.poolIdx(size)].Get(size) + idx := p.poolIdx(size) + if idx == levelMaxPoolIdx { + return make([]byte, size) + } + return p.pools[idx].Get(size) } func (p *simpleSharedBufferPool) Put(bs *[]byte) { - p.pools[p.poolIdx(cap(*bs))].Put(bs) + idx := p.poolIdx(cap(*bs)) + if idx == levelMaxPoolIdx { + return + } + p.pools[idx].Put(bs) } func (p *simpleSharedBufferPool) poolIdx(size int) int { @@ -69,7 +77,7 @@ const ( level1PoolMaxSize = level0PoolMaxSize * 16 // 256 B level2PoolMaxSize = level1PoolMaxSize * 16 // 4 KB level3PoolMaxSize = level2PoolMaxSize * 16 // 64 KB - level4PoolMaxSize = level3PoolMaxSize * 16 // 1 MB + level4PoolMaxSize = level3PoolMaxSize * 2 // 128 MB ) const (