Skip to content

Commit

Permalink
opt: improve the management logic of the mixed-buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 5, 2021
1 parent 4ac906c commit b8d571d
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 86 deletions.
51 changes: 0 additions & 51 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,57 +471,6 @@ func TestServeWithGnetClient(t *testing.T) {
})
})
})

t.Run("poll-reuseaddr", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServeWithGnetClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
})
})
})
}

type testClientServer struct {
Expand Down
6 changes: 3 additions & 3 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr
codec: codec,
localAddr: localAddr,
remoteAddr: remoteAddr,
inboundBuffer: rbPool.GetWithSize(ringbuffer.TCPReadBufferSize),
outboundBuffer: mixedbuffer.New(mixedbuffer.MaxStackingBytes),
inboundBuffer: rbPool.GetWithSize(ringbuffer.MaxStreamBufferCap),
outboundBuffer: mixedbuffer.New(ringbuffer.MaxStreamBufferCap),
}
c.pollAttachment = netpoll.GetPollAttachment()
c.pollAttachment.FD, c.pollAttachment.Callback = fd, c.handleEvents
Expand Down Expand Up @@ -201,8 +201,8 @@ func (c *conn) writev(bs [][]byte) (err error) {
for i := range c.packets {
np := len(c.packets[i])
if n < np {
pos = i
c.packets[i] = c.packets[i][n:]
pos = i
break
}
n -= np
Expand Down
4 changes: 2 additions & 2 deletions gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ func Serve(eventHandler EventHandler, protoAddr string, opts ...Option) (err err
rbc := options.ReadBufferCap
switch {
case rbc <= 0:
options.ReadBufferCap = ringbuffer.TCPReadBufferSize
options.ReadBufferCap = ringbuffer.MaxStreamBufferCap
case rbc <= ringbuffer.DefaultBufferSize:
options.ReadBufferCap = ringbuffer.DefaultBufferSize
default:
options.ReadBufferCap = toolkit.CeilToPowerOfTwo(rbc)
ringbuffer.TCPReadBufferSize = options.ReadBufferCap
ringbuffer.MaxStreamBufferCap = options.ReadBufferCap
}

network, addr := parseProtoAddr(protoAddr)
Expand Down
40 changes: 28 additions & 12 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,50 +390,66 @@ func TestServe(t *testing.T) {
t.Run("poll-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin)
testServe(t, "tcp", ":9991", true, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", true, false, true, false, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin)
testServe(t, "tcp", ":9991", true, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", true, false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", true, false, false, true, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", true, false, true, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", true, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections)
testServe(t, "udp", ":9992", true, false, true, false, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", true, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections)
testServe(t, "udp", ":9992", true, false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", true, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", true, false, true, false, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", true, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", true, false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", true, false, false, true, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", true, false, true, true, true, 10, LeastConnections)
})
})
})
Expand Down
39 changes: 24 additions & 15 deletions pkg/mixedbuffer/mixed_ring_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"github.com/panjf2000/gnet/pkg/ringbuffer"
)

// MaxStackingBytes is the maximum amount which is allowed to be piled up in the ring-buffer.
const MaxStackingBytes = 32 * 1024 // 32KB

// Buffer combines ring-buffer and list-buffer.
// Ring-buffer is the top-priority buffer to store response data, gnet will only switch to
// list-buffer if the data size of ring-buffer reaches the maximum(MaxStackingBytes), list-buffer is more
Expand All @@ -35,7 +32,7 @@ type Buffer struct {

// New instantiates a mixedbuffer.Buffer and returns it.
func New(maxTopBufCap int) *Buffer {
return &Buffer{maxStackingBytes: maxTopBufCap, ringBuffer: rbPool.Get()}
return &Buffer{maxStackingBytes: maxTopBufCap, ringBuffer: rbPool.GetWithSize(maxTopBufCap)}
}

// Peek returns all bytes as [][]byte, these bytes won't be discarded until Buffer.Discard() is called.
Expand All @@ -61,31 +58,43 @@ func (mb *Buffer) Write(p []byte) (n int, err error) {
mb.listBuffer.PushBytesBack(p)
return len(p), nil
}
n, err = mb.ringBuffer.Write(p)
if n > mb.maxStackingBytes {
mb.maxStackingBytes = n
freeSize := mb.ringBuffer.Free()
if len(p) > freeSize {
n, err = mb.ringBuffer.Write(p[:freeSize])
mb.listBuffer.PushBytesBack(p[n:])
return
}
return
return mb.ringBuffer.Write(p)
}

// Writev appends multiple byte slices to this buffer.
func (mb *Buffer) Writev(bs [][]byte) (int, error) {
var n int
if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= mb.maxStackingBytes {
var n int
for _, b := range bs {
mb.listBuffer.PushBytesBack(b)
n += len(b)
}
return n, nil
}
for _, b := range bs {
_, _ = mb.ringBuffer.Write(b)
n += len(b)
var pos, sum int
freeSize := mb.ringBuffer.Free()
for i, b := range bs {
pos = i
sum += len(b)
if len(b) > freeSize {
n, _ := mb.ringBuffer.Write(b[:freeSize])
mb.listBuffer.PushBytesBack(b[n:])
break
}
n, _ := mb.ringBuffer.Write(b)
freeSize -= n
}
if n > mb.maxStackingBytes {
mb.maxStackingBytes = n
for pos++; pos < len(bs); pos++ {
sum += len(bs[pos])
mb.listBuffer.PushBytesBack(bs[pos])
}
return n, nil
return sum, nil
}

// IsEmpty indicates whether this buffer is empty.
Expand Down
2 changes: 1 addition & 1 deletion pkg/pool/ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (p *Pool) GetWithSize(size int) *RingBuffer {
v := p.pool.Get()
if v != nil {
rb := v.(*RingBuffer)
if rb.Cap() >= size {
if rb.Len() >= size {
return rb
}
p.pool.Put(v)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ringbuffer/ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
bufferGrowThreshold = 4 * 1024 // 4KB
)

// TCPReadBufferSize is the default read buffer size for each TCP socket.
var TCPReadBufferSize = 64 * 1024 // 64KB
// MaxStreamBufferCap is the default buffer size for each stream-oriented connection(TCP/Unix).
var MaxStreamBufferCap = 64 * 1024 // 64KB

// ErrIsEmpty will be returned when trying to read an empty ring-buffer.
var ErrIsEmpty = errors.New("ring-buffer is empty")
Expand Down

0 comments on commit b8d571d

Please sign in to comment.