-
Notifications
You must be signed in to change notification settings - Fork 0
/
hashing.go
149 lines (124 loc) · 3.64 KB
/
hashing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package inmemory
import (
"hash/crc32"
"sort"
"sync"
)
// Server struct hold the info about the server. It has an address and number of
// virtual nodes to create for the server.
type Server struct {
Addr string `json:"addr"`
Weight int `json:"weight"`
}
// node and nodes represent virtual node for the server
// it allows better keys distribution in the circle
type node uint32
type nodes []node
func (n nodes) Len() int { return len(n) }
func (n nodes) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
func (n nodes) Less(i, j int) bool { return n[i] < n[j] }
// Circle struct hold all information about virtual nodes and servers. Also it has
// means to add/remove servers without major key shifts.
// Its methods are safe for the concurrent use.
type Circle struct {
sync.RWMutex
nodes nodes
servers map[*Server]struct{}
node2server map[node]*Server
}
// NewCircle returns pointer to the Circle struct object ready for usage.
func NewCircle() *Circle {
return &Circle{
servers: make(map[*Server]struct{}),
node2server: make(map[node]*Server),
}
}
// Adjust the number of servers in the pool. It either deletes disabled servers or
// adds new ones for the keys distribution.
func (c *Circle) Adjust(servers ...*Server) {
newServers := make(map[*Server]struct{})
for _, server := range servers {
newServers[server] = struct{}{}
}
c.Lock()
defer c.Unlock()
// remove servers if needed
for server := range c.servers {
if _, ok := newServers[server]; !ok {
c.remove(server)
}
}
// add new servers if needed
for server := range newServers {
if _, ok := c.servers[server]; !ok {
c.add(server)
}
}
sort.Sort(c.nodes)
}
// AddServer to the circle.
func (c *Circle) AddServer(server *Server) {
c.Lock()
defer c.Unlock()
c.add(server)
sort.Sort(c.nodes)
}
// RemoveServer from the circle.
func (c *Circle) RemoveServer(s *Server) {
if _, ok := c.servers[s]; ok {
c.Lock()
defer c.Unlock()
c.remove(s)
}
}
// Get key from the circle. Key distribution depends on the servers weight.
func (c *Circle) Get(key string) *Server {
c.RLock()
defer c.RUnlock()
// search for the virtualnode, associated with the key
i := c.search(key)
if i >= len(c.nodes) {
i = 0
}
return c.node2server[c.nodes[i]]
}
// adding server to the circle. Each server has number of vnodes represented as hashes
// to generate several hashes from server address, append byte for each iteration
func (c *Circle) add(server *Server) {
if _, ok := c.servers[server]; !ok {
c.servers[server] = struct{}{}
serverBytes := []byte(server.Addr)
for i := 0; i < server.Weight; i++ {
vnodeHash := node(crc32.ChecksumIEEE(serverBytes))
c.nodes = append(c.nodes, vnodeHash)
// assosiate virtual node hash with the server
c.node2server[vnodeHash] = server
// update input to generate new hash
serverBytes = append(serverBytes, '_')
}
}
}
func (c *Circle) remove(s *Server) {
// removing last server from the circle means its nullifying
if len(c.servers) == 1 {
c.nodes = nodes{}
c.servers = make(map[*Server]struct{})
c.node2server = make(map[node]*Server)
return
}
// delete vnodes associated with the server
for nodeIndex, nodeHash := range c.nodes {
if server := c.node2server[nodeHash]; *server == *s {
delete(c.node2server, nodeHash)
c.nodes = append(c.nodes[:nodeIndex], c.nodes[nodeIndex+1:]...)
}
}
delete(c.servers, s)
}
// search for the key in the nodes. The node returned has hash value greater the key's hash
func (c *Circle) search(key string) int {
searchfn := func(i int) bool {
return c.nodes[i] >= node(crc32.ChecksumIEEE([]byte(key)))
}
return sort.Search(len(c.nodes), searchfn)
}