Skip to content

Commit

Permalink
optimize buffer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
hslam committed Sep 26, 2021
1 parent 729ad10 commit d1e7ca1
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 42 deletions.
5 changes: 3 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package websocket

import (
"github.com/hslam/buffer"
"github.com/hslam/writer"
"io"
"math/rand"
Expand Down Expand Up @@ -45,8 +46,8 @@ type Conn struct {
writeBuffer []byte
buffer []byte
connBuffer []byte
readPool *sync.Pool
writePool *sync.Pool
readPool *buffer.Pool
writePool *buffer.Pool
closed int32
}

Expand Down
31 changes: 7 additions & 24 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
package websocket

import (
"github.com/hslam/buffer"
"io"
"math/rand"
"strings"
"sync"
"sync/atomic"
)

const (
Expand All @@ -32,27 +32,10 @@ const (
)

var (
buffers = sync.Map{}
assign int32
buffers = buffer.NewBuffers(1024)
framePool = &sync.Pool{New: func() interface{} { return &frame{} }}
)

func assignPool(size int) *sync.Pool {
for {
if p, ok := buffers.Load(size); ok {
return p.(*sync.Pool)
}
if atomic.CompareAndSwapInt32(&assign, 0, 1) {
var pool = &sync.Pool{New: func() interface{} {
return make([]byte, size)
}}
buffers.Store(size, pool)
atomic.StoreInt32(&assign, 0)
return pool
}
}
}

func (c *Conn) getFrame() *frame {
return framePool.Get().(*frame)
}
Expand Down Expand Up @@ -87,7 +70,7 @@ func (c *Conn) readFrame(buf []byte) (f *frame, err error) {
}
var readBuffer []byte
if c.shared {
readBuffer = c.readPool.Get().([]byte)
readBuffer = c.readPool.GetBuffer(c.readBufferSize)
readBuffer = readBuffer[:cap(readBuffer)]
} else {
readBuffer = c.readBuffer
Expand All @@ -96,7 +79,7 @@ func (c *Conn) readFrame(buf []byte) (f *frame, err error) {
n, err = c.read(readBuffer)
if err != nil {
if c.shared {
c.readPool.Put(readBuffer)
c.readPool.PutBuffer(readBuffer)
}
errMsg := err.Error()
if strings.Contains(errMsg, "use of closed network connection") || strings.Contains(errMsg, "connection reset by peer") {
Expand All @@ -116,7 +99,7 @@ func (c *Conn) readFrame(buf []byte) (f *frame, err error) {
c.buffer = append(c.buffer, readBuffer[:n]...)
}
if c.shared {
c.readPool.Put(readBuffer)
c.readPool.PutBuffer(readBuffer)
}
}
}
Expand All @@ -129,7 +112,7 @@ func (c *Conn) writeFrame(f *frame) error {
}
var writeBuffer []byte
if c.shared {
writeBuffer = c.writePool.Get().([]byte)
writeBuffer = c.writePool.GetBuffer(c.writeBufferSize)
writeBuffer = writeBuffer[:cap(writeBuffer)]
} else {
writeBuffer = c.writeBuffer
Expand All @@ -152,7 +135,7 @@ func (c *Conn) writeFrame(f *frame) error {
}
c.putFrame(f)
if c.shared {
c.writePool.Put(writeBuffer)
c.writePool.PutBuffer(writeBuffer)
} else {
c.writeBuffer = writeBuffer
}
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/hslam/websocket

go 1.15

require github.com/hslam/writer v1.0.1-0.20210613145035-7fe9abcc6a5c
require (
github.com/hslam/buffer v0.0.0-20210926124055-86667d87033c
github.com/hslam/writer v1.0.1-0.20210926213639-2676a03cb4b8
)
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/hslam/writer v1.0.1-0.20210613145035-7fe9abcc6a5c h1:hJXEO+/q53/fP3oO9ETbQBOkyCRPCNlvngYdKDxs9HA=
github.com/hslam/writer v1.0.1-0.20210613145035-7fe9abcc6a5c/go.mod h1:DJTidm2vEiuE6ZOOpqc/PnW6C9XipodkRPE/b2hxXA0=
github.com/hslam/buffer v0.0.0-20210926124055-86667d87033c h1:wxwQJEvKW1MuUxTtoxsZsDHxS9CObyx4Z0TjfTgLbrU=
github.com/hslam/buffer v0.0.0-20210926124055-86667d87033c/go.mod h1:Gvbj40hnzR54zoUOuDZqDi7aziar8UlkHXk6NVYLg2U=
github.com/hslam/writer v1.0.1-0.20210926213639-2676a03cb4b8 h1:qus7VOaTXD2WSuqL8xoVTXhsnPp39TjF2IRMFfGxKT8=
github.com/hslam/writer v1.0.1-0.20210926213639-2676a03cb4b8/go.mod h1:s4A43dNjDZfFoRKu7nk46S4j4eXOOQOqjwHRy0aRsdc=
18 changes: 9 additions & 9 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"crypto/sha1"
"encoding/base64"
"errors"
"github.com/hslam/buffer"
"math/rand"
"net"
"net/http"
"sync"
"time"
)

Expand All @@ -28,11 +28,11 @@ func server(conn net.Conn, shared bool, key string) *Conn {
writeBufferSize += 14
var readBuffer []byte
var writeBuffer []byte
var readPool *sync.Pool
var writePool *sync.Pool
var readPool *buffer.Pool
var writePool *buffer.Pool
if shared {
readPool = assignPool(readBufferSize)
writePool = assignPool(writeBufferSize)
readPool = buffers.AssignPool(readBufferSize)
writePool = buffers.AssignPool(writeBufferSize)
} else {
readBuffer = make([]byte, readBufferSize)
writeBuffer = make([]byte, writeBufferSize)
Expand Down Expand Up @@ -60,11 +60,11 @@ func client(conn net.Conn, shared bool, address, path string) *Conn {
writeBufferSize += 14
var readBuffer []byte
var writeBuffer []byte
var readPool *sync.Pool
var writePool *sync.Pool
var readPool *buffer.Pool
var writePool *buffer.Pool
if shared {
readPool = assignPool(readBufferSize)
writePool = assignPool(writeBufferSize)
readPool = buffers.AssignPool(readBufferSize)
writePool = buffers.AssignPool(writeBufferSize)
} else {
readBuffer = make([]byte, readBufferSize)
writeBuffer = make([]byte, writeBufferSize)
Expand Down
5 changes: 1 addition & 4 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (

// SetConcurrency sets a callback func concurrency for writer.
func (c *Conn) SetConcurrency(concurrency func() int) {
if concurrency == nil {
return
}
c.writing.Lock()
defer c.writing.Unlock()
c.writer = writer.NewWriter(c.writer, concurrency, 65536, false)
c.writing.Unlock()
}

// ReceiveMessage receives single frame from ws, unmarshaled and stores in v.
Expand Down

0 comments on commit d1e7ca1

Please sign in to comment.