Skip to content

Commit d8b9e5f

Browse files
authored
Use bytebuff pool to reduce alloc memory. (#74)
1 parent 12c8ef4 commit d8b9e5f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1497
-874
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ install:
1616

1717
script:
1818
- golangci-lint run ./...
19-
- go test -race -coverprofile=coverage.txt -covermode=atomic ./...
19+
- go test -coverprofile=coverage.txt -covermode=atomic ./...
2020

2121
after_success:
2222
- bash <(curl -s https://codecov.io/bash)

client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type ClientBuilder interface {
6262
// OnClose register handler when client socket closed.
6363
OnClose(func(error)) ClientBuilder
6464
// OnConnect register handler when client socket connected.
65-
OnConnect(func(Client,error)) ClientBuilder
65+
OnConnect(func(Client, error)) ClientBuilder
6666
// Acceptor set acceptor for RSocket client.
6767
Acceptor(acceptor ClientSocketAcceptor) ToClientStarter
6868
}
@@ -91,7 +91,7 @@ type clientBuilder struct {
9191
setup *socket.SetupInfo
9292
acceptor ClientSocketAcceptor
9393
onCloses []func(error)
94-
onConnects []func(Client,error)
94+
onConnects []func(Client, error)
9595
connectTimeout time.Duration
9696
}
9797

@@ -119,7 +119,7 @@ func (cb *clientBuilder) Fragment(mtu int) ClientBuilder {
119119
return cb
120120
}
121121

122-
func (cb *clientBuilder) OnConnect(fn func(Client,error)) ClientBuilder {
122+
func (cb *clientBuilder) OnConnect(fn func(Client, error)) ClientBuilder {
123123
cb.onConnects = append(cb.onConnects, fn)
124124
return cb
125125
}
@@ -215,11 +215,11 @@ func (cb *clientBuilder) Start(ctx context.Context) (client Client, err error) {
215215

216216
// trigger OnConnect
217217
if len(cb.onConnects) > 0 {
218-
var onConnects []func(Client,error)
218+
var onConnects []func(Client, error)
219219
onConnects, cb.onConnects = cb.onConnects, nil
220220
go func() {
221221
for _, onConnect := range onConnects {
222-
onConnect(client,err)
222+
onConnect(client, err)
223223
}
224224
}()
225225
}

cmd/rsocket-cli/rsocket-cli.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func main() {
4141
app.UsageText = "rsocket-cli [global options] [URI]"
4242
app.Name = "rsocket-cli"
4343
app.Usage = "CLI for RSocket."
44-
app.Version = "v0.6"
44+
app.Version = "v0.7"
4545
app.Flags = newFlags(conf)
4646
app.ArgsUsage = "[URI]"
4747
app.Action = func(c *cli.Context) (err error) {

cmd/rsocket-cli/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (r *Runner) runServerMode(ctx context.Context) error {
188188
r.showPayload(message)
189189
return sendingPayloads
190190
}))
191-
options = append(options, rsocket.RequestChannel(func(messages rx.Publisher) flux.Flux {
191+
options = append(options, rsocket.RequestChannel(func(messages flux.Flux) flux.Flux {
192192
messages.Subscribe(ctx, rx.OnNext(func(input payload.Payload) error {
193193
r.showPayload(input)
194194
return nil

core/framing/frame.go

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,52 +10,83 @@ import (
1010

1111
var errIncompleteFrame = errors.New("incomplete frame")
1212

13-
type tinyFrame struct {
13+
type baseWriteableFrame struct {
1414
header core.FrameHeader
1515
done chan struct{}
1616
}
1717

18-
func (t *tinyFrame) Header() core.FrameHeader {
18+
// FromBytes creates frame from a byte slice.
19+
func FromBytes(b []byte) (f core.BufferedFrame, err error) {
20+
if len(b) < core.FrameHeaderLen {
21+
err = errIncompleteFrame
22+
return
23+
}
24+
header := core.ParseFrameHeader(b[:core.FrameHeaderLen])
25+
bb := common.BorrowByteBuff()
26+
_, err = bb.Write(b[core.FrameHeaderLen:])
27+
if err != nil {
28+
common.ReturnByteBuff(bb)
29+
return
30+
}
31+
raw := newBaseDefaultFrame(header, bb)
32+
f, err = FromRawFrame(raw)
33+
if err != nil {
34+
common.ReturnByteBuff(bb)
35+
}
36+
return
37+
}
38+
39+
func (t baseWriteableFrame) Header() core.FrameHeader {
1940
return t.header
2041
}
2142

2243
// Done can be invoked when a frame has been been processed.
23-
func (t *tinyFrame) Done() (closed bool) {
44+
func (t baseWriteableFrame) Done() (closed bool) {
2445
defer func() {
25-
if e := recover(); e != nil {
26-
closed = true
27-
}
46+
closed = recover() != nil
2847
}()
2948
close(t.done)
3049
return
3150
}
3251

3352
// DoneNotify notify when frame has been done.
34-
func (t *tinyFrame) DoneNotify() <-chan struct{} {
53+
func (t baseWriteableFrame) DoneNotify() <-chan struct{} {
3554
return t.done
3655
}
3756

38-
// RawFrame is basic frame implementation.
39-
type RawFrame struct {
40-
*tinyFrame
41-
body *common.ByteBuff
57+
// baseDefaultFrame is basic frame implementation.
58+
type baseDefaultFrame struct {
59+
header core.FrameHeader
60+
body *common.ByteBuff
61+
}
62+
63+
func (f *baseDefaultFrame) Header() core.FrameHeader {
64+
return f.header
65+
}
66+
67+
// Release releases resource.
68+
func (f *baseDefaultFrame) Release() {
69+
if f != nil && f.body != nil {
70+
common.ReturnByteBuff(f.body)
71+
f.body = nil
72+
}
4273
}
4374

4475
// Body returns frame body.
45-
func (f *RawFrame) Body() *common.ByteBuff {
76+
func (f *baseDefaultFrame) Body() *common.ByteBuff {
4677
return f.body
4778
}
4879

4980
// Len returns length of frame.
50-
func (f *RawFrame) Len() int {
81+
func (f *baseDefaultFrame) Len() int {
5182
if f.body == nil {
5283
return core.FrameHeaderLen
5384
}
5485
return core.FrameHeaderLen + f.body.Len()
5586
}
5687

5788
// WriteTo write frame to writer.
58-
func (f *RawFrame) WriteTo(w io.Writer) (n int64, err error) {
89+
func (f *baseDefaultFrame) WriteTo(w io.Writer) (n int64, err error) {
5990
var wrote int64
6091
wrote, err = f.header.WriteTo(w)
6192
if err != nil {
@@ -72,7 +103,7 @@ func (f *RawFrame) WriteTo(w io.Writer) (n int64, err error) {
72103
return
73104
}
74105

75-
func (f *RawFrame) trySeekMetadataLen(offset int) (n int, hasMetadata bool) {
106+
func (f *baseDefaultFrame) trySeekMetadataLen(offset int) (n int, hasMetadata bool) {
76107
raw := f.body.Bytes()
77108
if offset > 0 {
78109
raw = raw[offset:]
@@ -89,15 +120,15 @@ func (f *RawFrame) trySeekMetadataLen(offset int) (n int, hasMetadata bool) {
89120
return
90121
}
91122

92-
func (f *RawFrame) trySliceMetadata(offset int) ([]byte, bool) {
123+
func (f *baseDefaultFrame) trySliceMetadata(offset int) ([]byte, bool) {
93124
n, ok := f.trySeekMetadataLen(offset)
94125
if !ok || n < 0 {
95126
return nil, false
96127
}
97128
return f.body.Bytes()[offset+3 : offset+3+n], true
98129
}
99130

100-
func (f *RawFrame) trySliceData(offset int) []byte {
131+
func (f *baseDefaultFrame) trySliceData(offset int) []byte {
101132
n, ok := f.trySeekMetadataLen(offset)
102133
if !ok {
103134
return f.body.Bytes()[offset:]
@@ -108,32 +139,16 @@ func (f *RawFrame) trySliceData(offset int) []byte {
108139
return f.body.Bytes()[offset+n+3:]
109140
}
110141

111-
func newTinyFrame(header core.FrameHeader) *tinyFrame {
112-
return &tinyFrame{
142+
func newBaseWriteableFrame(header core.FrameHeader) baseWriteableFrame {
143+
return baseWriteableFrame{
113144
header: header,
114145
done: make(chan struct{}),
115146
}
116147
}
117148

118-
// NewRawFrame returns a new RawFrame.
119-
func NewRawFrame(header core.FrameHeader, body *common.ByteBuff) *RawFrame {
120-
return &RawFrame{
121-
tinyFrame: newTinyFrame(header),
122-
body: body,
123-
}
124-
}
125-
126-
// FromBytes creates frame from a byte slice.
127-
func FromBytes(b []byte) (core.Frame, error) {
128-
if len(b) < core.FrameHeaderLen {
129-
return nil, errIncompleteFrame
130-
}
131-
header := core.ParseFrameHeader(b[:core.FrameHeaderLen])
132-
bb := common.NewByteBuff()
133-
_, err := bb.Write(b[core.FrameHeaderLen:])
134-
if err != nil {
135-
return nil, err
149+
func newBaseDefaultFrame(header core.FrameHeader, body *common.ByteBuff) *baseDefaultFrame {
150+
return &baseDefaultFrame{
151+
header: header,
152+
body: body,
136153
}
137-
raw := NewRawFrame(header, bb)
138-
return FromRawFrame(raw)
139154
}

core/framing/frame_cancel.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88

99
// CancelFrame is frame of Cancel.
1010
type CancelFrame struct {
11-
*RawFrame
11+
*baseDefaultFrame
1212
}
1313

1414
// WriteableCancelFrame is writeable frame of Cancel.
1515
type WriteableCancelFrame struct {
16-
*tinyFrame
16+
baseWriteableFrame
1717
}
1818

1919
// WriteTo writes current frame to given writer.
@@ -45,13 +45,13 @@ func (f *CancelFrame) Validate() (err error) {
4545
func NewWriteableCancelFrame(id uint32) *WriteableCancelFrame {
4646
h := core.NewFrameHeader(id, core.FrameTypeCancel, 0)
4747
return &WriteableCancelFrame{
48-
tinyFrame: newTinyFrame(h),
48+
baseWriteableFrame: newBaseWriteableFrame(h),
4949
}
5050
}
5151

5252
// NewCancelFrame creates cancel frame.
5353
func NewCancelFrame(sid uint32) *CancelFrame {
5454
return &CancelFrame{
55-
NewRawFrame(core.NewFrameHeader(sid, core.FrameTypeCancel, 0), nil),
55+
newBaseDefaultFrame(core.NewFrameHeader(sid, core.FrameTypeCancel, 0), nil),
5656
}
5757
}

core/framing/frame_error.go

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,59 @@ const (
1717

1818
// ErrorFrame is error frame.
1919
type ErrorFrame struct {
20-
*RawFrame
20+
*baseDefaultFrame
2121
}
2222

23-
// WriteableErrorFrame is writeable error frame.
24-
type WriteableErrorFrame struct {
25-
*tinyFrame
23+
type frozenError struct {
2624
code core.ErrorCode
2725
data []byte
2826
}
2927

30-
// Error returns error string.
31-
func (e WriteableErrorFrame) Error() string {
32-
return makeErrorString(e.code, e.data)
28+
// WriteableErrorFrame is writeable error frame.
29+
type WriteableErrorFrame struct {
30+
baseWriteableFrame
31+
frozenError
32+
}
33+
34+
// NewWriteableErrorFrame creates WriteableErrorFrame.
35+
func NewWriteableErrorFrame(id uint32, code core.ErrorCode, data []byte) *WriteableErrorFrame {
36+
h := core.NewFrameHeader(id, core.FrameTypeError, 0)
37+
t := newBaseWriteableFrame(h)
38+
return &WriteableErrorFrame{
39+
baseWriteableFrame: t,
40+
frozenError: frozenError{
41+
code: code,
42+
data: data,
43+
},
44+
}
45+
}
46+
47+
// NewErrorFrame returns a new error frame.
48+
func NewErrorFrame(streamID uint32, code core.ErrorCode, data []byte) *ErrorFrame {
49+
b := common.BorrowByteBuff()
50+
if err := binary.Write(b, binary.BigEndian, uint32(code)); err != nil {
51+
common.ReturnByteBuff(b)
52+
panic(err)
53+
}
54+
if _, err := b.Write(data); err != nil {
55+
common.ReturnByteBuff(b)
56+
panic(err)
57+
}
58+
return &ErrorFrame{
59+
newBaseDefaultFrame(core.NewFrameHeader(streamID, core.FrameTypeError, 0), b),
60+
}
61+
}
62+
63+
func (c frozenError) Error() string {
64+
return makeErrorString(c.ErrorCode(), c.ErrorData())
65+
}
66+
67+
func (c frozenError) ErrorCode() core.ErrorCode {
68+
return c.code
69+
}
70+
71+
func (c frozenError) ErrorData() []byte {
72+
return c.data
3373
}
3474

3575
// WriteTo writes frame to writer.
@@ -60,6 +100,13 @@ func (e WriteableErrorFrame) Len() int {
60100
return core.FrameHeaderLen + 4 + len(e.data)
61101
}
62102

103+
func (p *ErrorFrame) ToError() error {
104+
return frozenError{
105+
code: p.ErrorCode(),
106+
data: common.CloneBytes(p.ErrorData()),
107+
}
108+
}
109+
63110
// Validate returns error if frame is invalid.
64111
func (p *ErrorFrame) Validate() (err error) {
65112
if p.body.Len() < minErrorFrameLen {
@@ -84,33 +131,6 @@ func (p *ErrorFrame) ErrorData() []byte {
84131
return p.body.Bytes()[errDataOff:]
85132
}
86133

87-
// NewWriteableErrorFrame creates WriteableErrorFrame.
88-
func NewWriteableErrorFrame(id uint32, code core.ErrorCode, data []byte) *WriteableErrorFrame {
89-
h := core.NewFrameHeader(id, core.FrameTypeError, 0)
90-
t := newTinyFrame(h)
91-
return &WriteableErrorFrame{
92-
tinyFrame: t,
93-
code: code,
94-
data: data,
95-
}
96-
}
97-
98-
// NewErrorFrame returns a new error frame.
99-
func NewErrorFrame(streamID uint32, code core.ErrorCode, data []byte) *ErrorFrame {
100-
bf := common.NewByteBuff()
101-
var b4 [4]byte
102-
binary.BigEndian.PutUint32(b4[:], uint32(code))
103-
if _, err := bf.Write(b4[:]); err != nil {
104-
panic(err)
105-
}
106-
if _, err := bf.Write(data); err != nil {
107-
panic(err)
108-
}
109-
return &ErrorFrame{
110-
NewRawFrame(core.NewFrameHeader(streamID, core.FrameTypeError, 0), bf),
111-
}
112-
}
113-
114134
func makeErrorString(code core.ErrorCode, data []byte) string {
115135
bu := strings.Builder{}
116136
bu.WriteString(code.String())

0 commit comments

Comments
 (0)