Skip to content

Commit

Permalink
opt: make the mixed-buffer more flexible
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 5, 2021
1 parent 4b3679d commit 4ac906c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
2 changes: 1 addition & 1 deletion connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr
localAddr: localAddr,
remoteAddr: remoteAddr,
inboundBuffer: rbPool.GetWithSize(ringbuffer.TCPReadBufferSize),
outboundBuffer: mixedbuffer.New(),
outboundBuffer: mixedbuffer.New(mixedbuffer.MaxStackingBytes),
}
c.pollAttachment = netpoll.GetPollAttachment()
c.pollAttachment.FD, c.pollAttachment.Callback = fd, c.handleEvents
Expand Down
10 changes: 9 additions & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,15 @@ func (el *eventloop) loopWrite(c *conn) error {
el.eventHandler.PreWrite(c)

iov := c.outboundBuffer.Peek()
n, err := io.Writev(c.fd, iov)
var (
n int
err error
)
if len(iov) > 1 {
n, err = io.Writev(c.fd, iov)
} else {
n, err = unix.Write(c.fd, iov[0])
}
c.outboundBuffer.Discard(n)
switch err {
case nil:
Expand Down
24 changes: 16 additions & 8 deletions pkg/mixedbuffer/mixed_ring_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ const MaxStackingBytes = 32 * 1024 // 32KB
// list-buffer if the data size of ring-buffer reaches the maximum(MaxStackingBytes), list-buffer is more
// flexible and scalable, which helps the application reduce memory footprint.
type Buffer struct {
ringBuffer *ringbuffer.RingBuffer
listBuffer listbuffer.ListBuffer
maxStackingBytes int
ringBuffer *ringbuffer.RingBuffer
listBuffer listbuffer.ListBuffer
}

// New instantiates a mixedbuffer.Buffer and returns it.
func New() *Buffer {
return &Buffer{ringBuffer: rbPool.Get()}
func New(maxTopBufCap int) *Buffer {
return &Buffer{maxStackingBytes: maxTopBufCap, ringBuffer: rbPool.Get()}
}

// Peek returns all bytes as [][]byte, these bytes won't be discarded until Buffer.Discard() is called.
Expand All @@ -55,18 +56,22 @@ func (mb *Buffer) Discard(n int) {
}

// Write appends data to this buffer.
func (mb *Buffer) Write(p []byte) (int, error) {
if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= MaxStackingBytes {
func (mb *Buffer) Write(p []byte) (n int, err error) {
if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= mb.maxStackingBytes {
mb.listBuffer.PushBytesBack(p)
return len(p), nil
}
return mb.ringBuffer.Write(p)
n, err = mb.ringBuffer.Write(p)
if n > mb.maxStackingBytes {
mb.maxStackingBytes = n
}
return
}

// Writev appends multiple byte slices to this buffer.
func (mb *Buffer) Writev(bs [][]byte) (int, error) {
var n int
if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= MaxStackingBytes {
if !mb.listBuffer.IsEmpty() || mb.ringBuffer.Length() >= mb.maxStackingBytes {
for _, b := range bs {
mb.listBuffer.PushBytesBack(b)
n += len(b)
Expand All @@ -77,6 +82,9 @@ func (mb *Buffer) Writev(bs [][]byte) (int, error) {
_, _ = mb.ringBuffer.Write(b)
n += len(b)
}
if n > mb.maxStackingBytes {
mb.maxStackingBytes = n
}
return n, nil
}

Expand Down

0 comments on commit 4ac906c

Please sign in to comment.