-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.go
124 lines (109 loc) · 2.35 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright (c) 2019 Meng Huang ([email protected])
// This package is licensed under a MIT license that can be found in the LICENSE file.
package rpc
import (
"errors"
"sync"
"sync/atomic"
)
// ErrStreamShutdown is returned when the stream is shut down.
var ErrStreamShutdown = errors.New("The stream is shut down")
var eventPool = sync.Pool{New: func() interface{} {
return &event{}
}}
type event struct {
Value []byte
Error error
}
func getEvent() *event {
return eventPool.Get().(*event)
}
func freeEvent(e *event) {
*e = event{}
eventPool.Put(e)
}
// SetStream is used to connect rpc Stream.
type SetStream interface {
Connect(stream Stream) error
}
// Stream defines the message stream interface.
type Stream interface {
// WriteMessage writes a message to the stream.
WriteMessage(m interface{}) error
// ReadMessage reads a single message from the stream.
ReadMessage(b []byte, m interface{}) error
// Close closes the stream.
Close() error
}
type stream struct {
close func() error
unmarshal func(data []byte, v interface{}) error
write func(m interface{}) (err error)
mut sync.Mutex
cond sync.Cond
seq uint64
events []*event
noCopy bool
done bool
closed int32
}
func (w *stream) trigger(e *event) {
w.mut.Lock()
w.events = append(w.events, e)
w.mut.Unlock()
w.cond.Signal()
}
func (w *stream) ReadMessage(b []byte, m interface{}) (err error) {
w.mut.Lock()
if atomic.LoadInt32(&w.closed) > 0 {
w.mut.Unlock()
err = ErrStreamShutdown
return
}
for {
if len(w.events) > 0 {
e := w.events[0]
w.events = w.events[1:]
w.mut.Unlock()
if !w.noCopy {
if cap(b) > len(e.Value) {
b = b[:len(e.Value)]
} else {
b = make([]byte, len(e.Value))
}
copy(b, e.Value)
PutBuffer(e.Value)
w.unmarshal(b, m)
} else {
w.unmarshal(e.Value, m)
PutBuffer(e.Value)
}
err = e.Error
freeEvent(e)
return
}
w.cond.Wait()
if atomic.LoadInt32(&w.closed) > 0 {
w.mut.Unlock()
err = ErrStreamShutdown
return
}
}
}
func (w *stream) WriteMessage(m interface{}) (err error) {
if atomic.LoadInt32(&w.closed) > 0 {
err = ErrStreamShutdown
return
}
return w.write(m)
}
func (w *stream) stop() {
w.mut.Lock()
atomic.StoreInt32(&w.closed, 1)
w.mut.Unlock()
w.cond.Broadcast()
}
func (w *stream) Close() error {
w.stop()
return w.close()
}