Skip to content

Commit

Permalink
optimize byte buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mobus committed Jan 19, 2022
1 parent f8803fe commit 543503a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 93 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/hslam/scheduler v0.0.0-20211028175315-641598104976
github.com/hslam/sendfile v1.0.1
github.com/hslam/splice v1.0.3
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/stretchr/testify v1.7.0
gopkg.in/eapache/queue.v1 v1.1.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw=
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
45 changes: 9 additions & 36 deletions netpoll/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package netpoll

import (
"errors"
"github.com/hslam/buffer"
"net"
"sync"

"github.com/oxtoacart/bpool"
)

// ErrHandlerFunc is the error when the HandlerFunc is nil
Expand Down Expand Up @@ -71,16 +72,8 @@ func (h *ConnHandler) Serve(ctx Context) error {

// DataHandler implements the Handler interface.
type DataHandler struct {
// NoShared disables the DataHandler to use the buffer pool for high performance.
// Default NoShared is false to use the buffer pool for low memory usage.
NoShared bool
// NoCopy returns the bytes underlying buffer when NoCopy is true,
// The bytes returned is shared by all invocations of Read, so do not modify it.
// Default NoCopy is false to make a copy of data for every invocations of Read.
NoCopy bool
// BufferSize represents the buffer size.
BufferSize int
upgrade func(net.Conn) (net.Conn, error)
Pool *bpool.BytePool
upgrade func(net.Conn) (net.Conn, error)
// HandlerFunc is the data Serve function.
HandlerFunc func(req []byte) (res []byte)
}
Expand All @@ -90,7 +83,7 @@ type context struct {
writing sync.Mutex
upgrade bool
conn net.Conn
pool *buffer.Pool
pool *bpool.BytePool
buffer []byte
}

Expand All @@ -101,9 +94,6 @@ func (h *DataHandler) SetUpgrade(upgrade func(net.Conn) (net.Conn, error)) {

// Upgrade sets the net.Conn to a Context.
func (h *DataHandler) Upgrade(conn net.Conn) (Context, error) {
if h.BufferSize < 1 {
h.BufferSize = bufferSize
}
if h.HandlerFunc == nil {
return nil, ErrHandlerFunc
}
Expand All @@ -117,12 +107,7 @@ func (h *DataHandler) Upgrade(conn net.Conn) (Context, error) {
conn = c
}
}
var ctx = &context{upgrade: upgrade, conn: conn}
if h.NoShared {
ctx.buffer = make([]byte, h.BufferSize)
} else {
ctx.pool = buffer.AssignPool(h.BufferSize)
}
var ctx = &context{upgrade: upgrade, conn: conn, pool: h.Pool}
return ctx, nil
}

Expand All @@ -134,11 +119,8 @@ func (h *DataHandler) Serve(ctx Context) error {
var err error
var buf []byte
var req []byte
if h.NoShared {
buf = c.buffer
} else {
buf = c.pool.GetBuffer(h.BufferSize)
}
buf = c.pool.Get()
defer c.pool.Put(buf)
if c.upgrade {
c.reading.Lock()
}
Expand All @@ -147,16 +129,9 @@ func (h *DataHandler) Serve(ctx Context) error {
c.reading.Unlock()
}
if err != nil {
if !h.NoShared {
c.pool.PutBuffer(buf)
}
return err
}
req = buf[:n]
if !h.NoCopy {
req = make([]byte, n)
copy(req, buf[:n])
}
res := h.HandlerFunc(req)
if c.upgrade {
c.writing.Lock()
Expand All @@ -165,8 +140,6 @@ func (h *DataHandler) Serve(ctx Context) error {
if c.upgrade {
c.writing.Unlock()
}
if !h.NoShared {
c.pool.PutBuffer(buf)
}
c.pool.Put(res)
return err
}
6 changes: 3 additions & 3 deletions netpoll/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net"
"testing"
"time"

"github.com/oxtoacart/bpool"
)

func TestNewHandler(t *testing.T) {
Expand Down Expand Up @@ -68,9 +70,7 @@ func TestConnHandler(t *testing.T) {

func TestDataHandler(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: false,
BufferSize: 0,
Pool: bpool.NewBytePool(1024, 12*1024),
}
_, err := handler.Upgrade(&conn{})
if err != ErrHandlerFunc {
Expand Down
14 changes: 5 additions & 9 deletions netpoll/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"sync"
"testing"
"time"

"github.com/oxtoacart/bpool"
)

func TestListenAndServe(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -33,9 +33,7 @@ func TestListenAndServe(t *testing.T) {

func TestServe(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -54,9 +52,7 @@ func TestServe(t *testing.T) {

func TestNetServer(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: true,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down
62 changes: 17 additions & 45 deletions netpoll/net_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"testing"
"time"

"github.com/oxtoacart/bpool"
)

func TestServerListenAndServe(t *testing.T) {
Expand Down Expand Up @@ -55,9 +57,7 @@ func TestServerServe(t *testing.T) {

func TestServerPoll(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -88,9 +88,7 @@ func TestServerPoll(t *testing.T) {

func TestServerClose(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -117,9 +115,7 @@ func TestServerClose(t *testing.T) {

func TestServerNumCPU(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -148,9 +144,7 @@ func TestServerNumCPU(t *testing.T) {

func TestServerTCPListener(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -171,9 +165,7 @@ func TestServerTCPListener(t *testing.T) {

func TestServerUNIXListener(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -194,9 +186,7 @@ func TestServerUNIXListener(t *testing.T) {

func TestTCPServer(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -245,9 +235,7 @@ func TestTCPServer(t *testing.T) {

func TestUNIXServer(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -296,9 +284,7 @@ func TestOtherServer(t *testing.T) {
net.Listener
}
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -342,9 +328,7 @@ func TestOtherServer(t *testing.T) {

func TestShared(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -388,9 +372,7 @@ func TestShared(t *testing.T) {

func TestNoCopy(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: true,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -483,9 +465,7 @@ func TestWorker(t *testing.T) {

func TestNoAsync(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: true,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -530,9 +510,7 @@ func TestNoAsync(t *testing.T) {

func TestSharedWorkers(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: true,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -579,9 +557,7 @@ func TestSharedWorkers(t *testing.T) {

func TestSharedWorkersPanic(t *testing.T) {
var handler = &DataHandler{
NoShared: true,
NoCopy: true,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand All @@ -606,9 +582,7 @@ func TestSharedWorkersPanic(t *testing.T) {

func TestReschedule(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down Expand Up @@ -669,9 +643,7 @@ func TestReschedule(t *testing.T) {

func TestRescheduleDone(t *testing.T) {
var handler = &DataHandler{
NoShared: false,
NoCopy: false,
BufferSize: 1024,
Pool: bpool.NewBytePool(1024, 12*1024),
HandlerFunc: func(req []byte) (res []byte) {
res = req
return
Expand Down

0 comments on commit 543503a

Please sign in to comment.