Skip to content

Commit

Permalink
make the buffer size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
hslam committed Dec 31, 2021
1 parent d564a3e commit 88c8711
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 33 deletions.
5 changes: 5 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func PutBuffer(buf []byte) {
}
}

// BufferSize sets the buffer size.
type BufferSize interface {
SetBufferSize(int)
}

// GetContextBuffer gets a buffer from the context.
func GetContextBuffer(ctx context.Context) (buffer []byte) {
value := ctx.Value(BufferContextKey)
Expand Down
26 changes: 19 additions & 7 deletions codec_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
type clientCodec struct {
headerEncoder Encoder
bodyCodec Codec
pool *buffer.Pool
bufferPool *buffer.Pool
messages socket.Messages
count int64
closed uint32
}

// NewClientCodec returns a new ClientCodec.
func NewClientCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Messages) ClientCodec {
func NewClientCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Messages, writeBufSize int) ClientCodec {
if messages == nil {
return nil
}
Expand All @@ -33,10 +33,13 @@ func NewClientCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Mess
if bodyCodec == nil {
return nil
}
if writeBufSize < 1 {
writeBufSize = bufferSize
}
c := &clientCodec{
headerEncoder: headerEncoder,
bodyCodec: bodyCodec,
pool: buffer.AssignPool(bufferSize),
bufferPool: buffer.AssignPool(writeBufSize),
}
c.messages = messages
if batch, ok := c.messages.(socket.Batch); ok {
Expand All @@ -45,6 +48,15 @@ func NewClientCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Mess
return c
}

//SetBufferSize sets buffer size.
func (c *clientCodec) SetBufferSize(size int) {
if size > 0 {
c.bufferPool = buffer.AssignPool(size)
} else {
c.bufferPool = buffer.AssignPool(bufferSize)
}
}

func (c *clientCodec) Messages() socket.Messages {
return c.messages
}
Expand All @@ -62,13 +74,13 @@ func (c *clientCodec) WriteRequest(ctx *Context, param interface{}) error {
var err error
var argsBuffer []byte
if ctx.upgrade.NoRequest != noRequest {
argsBuffer = c.pool.GetBuffer(bufferSize)
argsBuffer = c.bufferPool.GetBuffer(0)
args, err = c.bodyCodec.Marshal(argsBuffer, param)
if err != nil {
return err
}
}
var requestBuffer = c.pool.GetBuffer(bufferSize)
var requestBuffer = c.bufferPool.GetBuffer(0)
if c.headerEncoder != nil {
req := c.headerEncoder.NewRequest()
req.SetSeq(ctx.Seq)
Expand All @@ -91,9 +103,9 @@ func (c *clientCodec) WriteRequest(ctx *Context, param interface{}) error {
}
defer func() {
if ctx.upgrade.NoRequest != noRequest {
c.pool.PutBuffer(argsBuffer)
c.bufferPool.PutBuffer(argsBuffer)
}
c.pool.PutBuffer(requestBuffer)
c.bufferPool.PutBuffer(requestBuffer)
}()
if err == nil {
atomic.AddInt64(&c.count, 1)
Expand Down
6 changes: 3 additions & 3 deletions codec_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func TestNewClientCodec(t *testing.T) {
conn, _ := sock.Dial(addr)
message := conn.Messages()
headerEncoder := NewHeaderEncoder("json")
if NewClientCodec(nil, nil, nil) != nil {
if NewClientCodec(nil, nil, nil, bufferSize) != nil {
t.Error("should be nil")
}
if NewClientCodec(nil, nil, message) != nil {
if NewClientCodec(nil, nil, message, bufferSize) != nil {
t.Error("should be nil")
}
codec := NewClientCodec(nil, headerEncoder(), message)
codec := NewClientCodec(nil, headerEncoder(), message, bufferSize)
if codec == nil {
t.Error("should not be nil")
}
Expand Down
11 changes: 7 additions & 4 deletions codec_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type serverCodec struct {
}

// NewServerCodec returns a new ServerCodec.
func NewServerCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Messages, noBatch bool) ServerCodec {
func NewServerCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Messages, noBatch bool, writeBufSize int) ServerCodec {
if messages == nil {
return nil
}
Expand All @@ -33,10 +33,13 @@ func NewServerCodec(bodyCodec Codec, headerEncoder Encoder, messages socket.Mess
if bodyCodec == nil {
return nil
}
if writeBufSize < 1 {
writeBufSize = bufferSize
}
c := &serverCodec{
headerEncoder: headerEncoder,
bodyCodec: bodyCodec,
pool: buffer.AssignPool(bufferSize),
pool: buffer.AssignPool(writeBufSize),
}
c.messages = messages
if !noBatch {
Expand Down Expand Up @@ -105,15 +108,15 @@ func (c *serverCodec) WriteResponse(ctx *Context, x interface{}) error {
hasResponse := len(ctx.Error) == 0 && ctx.upgrade.NoResponse != noResponse
var replyBuffer []byte
if hasResponse {
replyBuffer = c.pool.GetBuffer(bufferSize)
replyBuffer = c.pool.GetBuffer(0)
reply, err = c.bodyCodec.Marshal(replyBuffer, x)
if err != nil {
ctx.Error = err.Error()
}
} else if len(ctx.value) > 0 {
reply = ctx.value
}
var responseBuffer = c.pool.GetBuffer(bufferSize)
var responseBuffer = c.pool.GetBuffer(0)
if c.headerEncoder != nil {
res := c.headerEncoder.NewResponse()
res.SetSeq(reqSeq)
Expand Down
12 changes: 6 additions & 6 deletions codec_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ func TestNewServerCodec(t *testing.T) {
conn, _ := lis.Accept()
message := conn.Messages()
headerEncoder := NewHeaderEncoder("json")
if NewServerCodec(nil, nil, nil, false) != nil {
if NewServerCodec(nil, nil, nil, false, bufferSize) != nil {
t.Error("should be nil")
}
if NewServerCodec(nil, nil, message, false) != nil {
if NewServerCodec(nil, nil, message, false, bufferSize) != nil {
t.Error("should be nil")
}
codec := NewServerCodec(nil, headerEncoder(), message, false)
codec := NewServerCodec(nil, headerEncoder(), message, false, bufferSize)
if codec == nil {
t.Error("should not be nil")
}
Expand Down Expand Up @@ -66,13 +66,13 @@ func TestNewServerCodecNoBatch(t *testing.T) {
conn, _ := lis.Accept()
message := conn.Messages()
headerEncoder := NewHeaderEncoder("json")
if NewServerCodec(nil, nil, nil, true) != nil {
if NewServerCodec(nil, nil, nil, true, bufferSize) != nil {
t.Error("should be nil")
}
if NewServerCodec(nil, nil, message, true) != nil {
if NewServerCodec(nil, nil, message, true, bufferSize) != nil {
t.Error("should be nil")
}
codec := NewServerCodec(nil, headerEncoder(), message, true)
codec := NewServerCodec(nil, headerEncoder(), message, true, bufferSize)
if codec == nil {
t.Error("should not be nil")
}
Expand Down
16 changes: 13 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type Conn struct {
seq uint64
pending map[uint64]*Call
streams map[uint64]*Call
bufferSize int
bufferPool *buffer.Pool
writeSched scheduler.Scheduler
readSched scheduler.Scheduler
Expand All @@ -141,7 +140,6 @@ func NewConn() *Conn {
return &Conn{
pending: make(map[uint64]*Call),
streams: make(map[uint64]*Call),
bufferSize: bufferSize,
bufferPool: buffer.AssignPool(bufferSize),
}
}
Expand Down Expand Up @@ -169,6 +167,18 @@ func NewConnWithCodec(codec ClientCodec) *Conn {
return c
}

//SetBufferSize sets buffer size.
func (conn *Conn) SetBufferSize(size int) {
if size > 0 {
conn.bufferPool = buffer.AssignPool(size)
} else {
conn.bufferPool = buffer.AssignPool(bufferSize)
}
if s, ok := conn.codec.(BufferSize); ok {
s.SetBufferSize(size)
}
}

func (conn *Conn) write(call *Call) {
if conn.writeSched != nil {
conn.writeSched.Schedule(func() {
Expand Down Expand Up @@ -244,7 +254,7 @@ func (conn *Conn) recv() {
var trans = transition.NewTransition(16, conn.codec.Concurrency)
for err == nil {
ctx := getContext()
ctx.buffer = conn.bufferPool.GetBuffer(conn.bufferSize)
ctx.buffer = conn.bufferPool.GetBuffer(0)
ctx.data, err = messages.ReadMessage(ctx.buffer)
if err != nil {
break
Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestNewConnWithCodec(t *testing.T) {
if NewConnWithCodec(nil) != nil {
t.Error("should be nil")
}
clientCodec := NewClientCodec(NewJSONCodec(), DefaultEncoder(), c.Messages())
clientCodec := NewClientCodec(NewJSONCodec(), DefaultEncoder(), c.Messages(), bufferSize)
conn := NewConnWithCodec(clientCodec)
if conn == nil {
t.Error("should not be nil")
Expand Down
6 changes: 3 additions & 3 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Dial(network, address, codec string) (*Conn, error) {
if newSocket := NewSocket(network); newSocket != nil {
if newCodec := NewCodec(codec); newCodec != nil {
return NewConn().Dial(newSocket(nil), address, func(messages socket.Messages) ClientCodec {
return NewClientCodec(newCodec(), nil, messages)
return NewClientCodec(newCodec(), nil, messages, bufferSize)
})
}
return nil, errors.New("unsupported codec: " + codec)
Expand All @@ -27,7 +27,7 @@ func DialTLS(network, address, codec string, config *tls.Config) (*Conn, error)
if newSocket := NewSocket(network); newSocket != nil {
if newCodec := NewCodec(codec); newCodec != nil {
return NewConn().Dial(newSocket(config), address, func(messages socket.Messages) ClientCodec {
return NewClientCodec(newCodec(), nil, messages)
return NewClientCodec(newCodec(), nil, messages, bufferSize)
})
}
return nil, errors.New("unsupported codec: " + codec)
Expand Down Expand Up @@ -62,6 +62,6 @@ func DialWithOptions(address string, opts *Options) (*Conn, error) {
} else if opts.NewHeaderEncoder != nil {
headerEncoder = opts.NewHeaderEncoder()
}
return NewClientCodec(bodyCodec, headerEncoder, messages)
return NewClientCodec(bodyCodec, headerEncoder, messages, opts.ClientBufferSize)
})
}
6 changes: 4 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ type Options struct {
Codec string
HeaderEncoder string
TLSConfig *tls.Config
ClientBufferSize int
}

//DefaultOptions returns a default options.
func DefaultOptions() *Options {
return &Options{
NewSocket: socket.NewTCPSocket,
NewCodec: NewJSONCodec,
NewSocket: socket.NewTCPSocket,
NewCodec: NewJSONCodec,
ClientBufferSize: bufferSize,
}
}
10 changes: 6 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ func (server *Server) Services() []string {
//SetBufferSize sets buffer size.
func (server *Server) SetBufferSize(size int) {
if size > 0 {
server.bufferSize = size
server.bufferPool = buffer.AssignPool(size)
} else {
server.bufferPool = nil
server.bufferSize = bufferSize
server.bufferPool = buffer.AssignPool(bufferSize)
}
}

Expand Down Expand Up @@ -536,7 +538,7 @@ func (server *Server) Listen(network, address string, codec string) error {
if newSocket := NewSocket(network); newSocket != nil {
if newCodec := NewCodec(codec); newCodec != nil {
return server.listen(newSocket(nil), address, func(messages socket.Messages) ServerCodec {
return NewServerCodec(newCodec(), nil, messages, server.noBatch)
return NewServerCodec(newCodec(), nil, messages, server.noBatch, server.bufferSize)
})
}
return errors.New("unsupported codec: " + codec)
Expand All @@ -549,7 +551,7 @@ func (server *Server) ListenTLS(network, address string, codec string, config *t
if newSocket := NewSocket(network); newSocket != nil {
if newCodec := NewCodec(codec); newCodec != nil {
return server.listen(newSocket(config), address, func(messages socket.Messages) ServerCodec {
return NewServerCodec(newCodec(), nil, messages, server.noBatch)
return NewServerCodec(newCodec(), nil, messages, server.noBatch, server.bufferSize)
})
}
return errors.New("unsupported codec: " + codec)
Expand Down Expand Up @@ -584,7 +586,7 @@ func (server *Server) ListenWithOptions(address string, opts *Options) error {
} else if opts.NewHeaderEncoder != nil {
headerEncoder = opts.NewHeaderEncoder()
}
return NewServerCodec(bodyCodec, headerEncoder, messages, server.noBatch)
return NewServerCodec(bodyCodec, headerEncoder, messages, server.noBatch, server.bufferSize)
})
}

Expand Down

0 comments on commit 88c8711

Please sign in to comment.