-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouter.go
126 lines (117 loc) · 2.93 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package xcache
import (
"google.golang.org/protobuf/proto"
"xcache/protocol"
)
type CmdInfo struct {
New func() proto.Message
Cmd protocol.CMDType
}
var cmdInfos = map[protocol.CMDType]func() proto.Message{
protocol.CMDType_TPing: func() proto.Message {
return &protocol.Ping{}
},
protocol.CMDType_TPong: func() proto.Message {
return &protocol.Pong{}
},
protocol.CMDType_TGet: func() proto.Message {
return &protocol.GetArgs{}
},
protocol.CMDType_TGetReply: func() proto.Message {
return &protocol.GetReply{}
},
protocol.CMDType_TSet: func() proto.Message {
return &protocol.SetArgs{}
},
protocol.CMDType_TSetReply: func() proto.Message {
return &protocol.SetReply{}
},
protocol.CMDType_TDel: func() proto.Message {
return &protocol.DelArgs{}
},
protocol.CMDType_TDelReply: func() proto.Message {
return &protocol.DelReply{}
},
protocol.CMDType_TClusterNodes: func() proto.Message {
return &protocol.ClusterNodesArgs{}
},
protocol.CMDType_TClusterNodesReply: func() proto.Message {
return &protocol.ClusterNodeReply{}
},
}
type router struct {
cache *cache
cluster *cluster
}
func (r *router) HandleRequest(conn *connection, request *protocol.ProtoFrame) {
newMsg := cmdInfos[request.GetCmd()]
if newMsg == nil {
return
}
msg := newMsg()
err := proto.Unmarshal(request.Data, msg)
assert(err)
var frame *Frame
switch request.Cmd {
case protocol.CMDType_TPing:
pong := r.cluster.HandlePing(conn, msg.(*protocol.Ping))
frame = &Frame{
ReqId: request.GetReqId(),
Cmd: protocol.CMDType_TPong,
Message: pong,
}
case protocol.CMDType_TPong:
r.cluster.HandlePong(conn, request.GetReqId(), msg.(*protocol.Pong))
case protocol.CMDType_TGet:
args := msg.(*protocol.GetArgs)
v, err := r.cache.Get(args.Key)
if err != nil {
frame = &Frame{
ReqId: request.GetReqId(),
Cmd: protocol.CMDType_TGetReply,
Error: NewError(500, err),
}
} else {
reply := &protocol.GetReply{Value: v}
frame = &Frame{
ReqId: request.GetReqId(),
Cmd: protocol.CMDType_TGetReply,
Message: reply,
}
}
case protocol.CMDType_TSet:
args := msg.(*protocol.SetArgs)
r.cache.Set(args.Key, args.Value)
frame = &Frame{
ReqId: request.GetReqId(),
Cmd: protocol.CMDType_TSetReply,
Message: &protocol.SetReply{},
}
case protocol.CMDType_TClusterNodes:
nodes := r.cluster.Nodes()
frame = &Frame{
ReqId: request.GetReqId(),
Cmd: protocol.CMDType_TClusterNodesReply,
Message: &protocol.ClusterNodeReply{
Nodes: nodes,
},
}
}
if frame != nil {
data := conn.encode(frame)
_, err = conn.Write(data)
assert(err)
}
}
func (r *router) HandleDisconnected(conn *connection, err error) {
r.cluster.HandleDisconnected(conn, err)
}
type Frame struct {
ReqId uint64
Cmd protocol.CMDType
Message proto.Message
Error *protocol.Error
}
func NewError(code int32, err error) *protocol.Error {
return &protocol.Error{Code: code, Error: err.Error()}
}