-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdelegate.go
119 lines (95 loc) · 2.75 KB
/
delegate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package ringman
import (
"encoding/json"
"errors"
"github.com/Nitro/memberlist"
log "github.com/sirupsen/logrus"
)
type NodeMetadata struct {
ServicePort string
}
// Delegate is a Memberlist delegate that is responsible for handling
// integration between the hash ring and Memberlist messages. See
// the Memberlist documentation for detailed explanations of the
// callback methods.
type Delegate struct {
RingMan *HashRingManager
nodeMetadata *NodeMetadata
}
func NewDelegate(ringMan *HashRingManager, meta *NodeMetadata) *Delegate {
delegate := Delegate{
RingMan: ringMan,
nodeMetadata: meta,
}
return &delegate
}
func (d *Delegate) NodeMeta(limit int) []byte {
data, err := json.Marshal(d.nodeMetadata)
if err != nil {
log.Error("Error encoding Node metadata!")
data = []byte("{}")
}
log.Debugf("Setting metadata to: %s", string(data))
return data
}
func (d *Delegate) NotifyMsg(message []byte) {
log.Debugf("NotifyMsg(): %s", string(message))
}
func (d *Delegate) GetBroadcasts(overhead, limit int) [][]byte {
//log.Debugf("GetBroadcasts(): %d %d", overhead, limit)
return [][]byte{}
}
func (d *Delegate) LocalState(join bool) []byte {
log.Debugf("LocalState(): %t", join)
return []byte{}
}
func (d *Delegate) MergeRemoteState(buf []byte, join bool) {
log.Debugf("MergeRemoteState(): %s %t", string(buf), join)
}
func (d *Delegate) NotifyJoin(node *memberlist.Node) {
log.Debugf("NotifyJoin(): %s %s", node.Name, string(node.Meta))
if d.RingMan == nil {
log.Warn("Ring manager was nil in delegate!")
return
}
nodeKey, err := d.keyForNode(node)
if err != nil {
log.Errorf("NotifyJoin: %s", err)
return
}
d.RingMan.AddNode(nodeKey)
}
// keyForNode takes a node and returns the key we use to store it in the
// hashring. Currently based on the IP address and service port.
func (d *Delegate) keyForNode(node *memberlist.Node) (string, error) {
meta, err := DecodeNodeMetadata(node.Meta)
if err != nil {
return "", errors.New("Unable to decode metadata for " + node.Name + ", unable to add")
}
return node.Addr.String() + ":" + meta.ServicePort, nil
}
func (d *Delegate) NotifyLeave(node *memberlist.Node) {
log.Debugf("NotifyLeave(): %s", node.Name)
if d.RingMan == nil {
log.Error("Ring manager was nil in delegate!")
return
}
nodeKey, err := d.keyForNode(node)
if err != nil {
log.Errorf("NotifyLeave: %s", err)
return
}
d.RingMan.RemoveNode(nodeKey)
}
func (d *Delegate) NotifyUpdate(node *memberlist.Node) {
log.Debugf("NotifyUpdate(): %s - %s", node.Name, node.Meta)
}
// DecodeNodeMetadata takes a byte slice and deserializes it
func DecodeNodeMetadata(data []byte) (*NodeMetadata, error) {
var meta NodeMetadata
err := json.Unmarshal(data, &meta)
if err != nil {
return nil, err
}
return &meta, nil
}