-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathtconfig.go
150 lines (129 loc) · 3.3 KB
/
tconfig.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package turbo
import (
"context"
"sync"
"sync/atomic"
"time"
)
const (
CONCURRENT_LEVEL = 8
)
//-----------响应的future
type Future struct {
timeout time.Duration
opaque uint32
once sync.Once
ch chan interface{}
response interface{}
TargetHost string
Err error
ctx context.Context
}
func NewFuture(opaque uint32, timeout time.Duration, targetHost string, ctx context.Context) *Future {
return &Future{
timeout: timeout,
opaque: opaque,
ch: make(chan interface{}, 1),
TargetHost: targetHost,
ctx: ctx,
Err: nil}
}
//创建有错误的future
func NewErrFuture(opaque uint32, targetHost string, err error, ctx context.Context) *Future {
f := &Future{
timeout: 0,
opaque: opaque,
ch: make(chan interface{}, 1),
TargetHost: targetHost,
ctx: ctx}
f.Error(err)
return f
}
func (f *Future) Error(err error) {
f.once.Do(func() {
f.Err = err
close(f.ch)
})
}
func (f *Future) SetResponse(resp interface{}) {
f.once.Do(func() {
f.response = resp
close(f.ch)
})
}
func (f *Future) Get(timeout <-chan time.Time) (interface{}, error) {
select {
case <-timeout:
//如果是已经超时了但是当前还是没有响应也认为超时
f.Error(ERR_TIMEOUT)
return f.response, f.Err
case <-f.ctx.Done():
f.Error(ERR_CONNECTION_BROKEN)
return f.response, f.Err
case <-f.ch:
return f.response, f.Err
}
}
//网络层参数
type TConfig struct {
FlowStat *RemotingFlow //网络层流量
dispool *GPool // 最大分发处理协程数
ReadBufferSize int //读取缓冲大小
WriteBufferSize int //写入缓冲大小
WriteChannelSize int //写异步channel长度
ReadChannelSize int //读异步channel长度
IdleTime time.Duration //连接空闲时间
RequestHolder *ReqHolder
TW *TimerWheel // timewheel
cancel context.CancelFunc
}
func NewTConfig(name string,
maxdispatcherNum,
readbuffersize,
writebuffersize,
writechannlesize,
readchannelsize int,
idletime time.Duration,
maxOpaque int) *TConfig {
tw := NewTimerWheel(100 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
rh := &ReqHolder{
opaque: 0,
holder: NewLRUCache(ctx, maxOpaque, tw, nil),
tw: tw,
idleTime: idletime}
dispool := NewLimitPool(ctx, maxdispatcherNum)
//初始化
rc := &TConfig{
FlowStat: NewRemotingFlow(name, dispool),
dispool: dispool,
ReadBufferSize: readbuffersize,
WriteBufferSize: writebuffersize,
WriteChannelSize: writechannlesize,
ReadChannelSize: readchannelsize,
IdleTime: idletime,
RequestHolder: rh,
TW: tw,
cancel: cancel,
}
return rc
}
type ReqHolder struct {
opaque uint32
tw *TimerWheel
holder *LRUCache
idleTime time.Duration //连接空闲时间
}
func (self *ReqHolder) CurrentOpaque() uint32 {
return uint32(atomic.AddUint32(&self.opaque, 1))
}
//从requesthold中移除
func (self *ReqHolder) Detach(opaque uint32, obj interface{}) {
future := self.holder.Remove(opaque)
if nil != future {
future.(*Future).SetResponse(obj)
}
}
func (self *ReqHolder) Attach(opaque uint32, future *Future) chan time.Time {
return self.holder.Put(opaque, future, future.timeout)
}