-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtest.py
131 lines (109 loc) · 3.99 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from txgossip.gossip import Gossiper, _address_to_peer_name, Participant as _Participant
from twisted.internet import reactor
import random
CNT = 20
cnt = 0
class Participant(_Participant):
def __init__(self, name):
self.name = name
def value_changed(self, peer, key, value):
if key == '__heartbeat__':
return
#if peer != self.name:
# print self.name, "saw", peer, "change", key, "to", value
if key == 'x':
global cnt
cnt += 1
if cnt == CNT:
print reactor.seconds()
#print "DONE"
if key == '/leader-election/vote':
try:
vote = self.gossiper.get(
'/leader-election/vote')
# check consensus:
for peer in self.gossiper.live_peers:
v = peer.get('/leader-election/vote')
if v != vote:
#print "no consensus", peer, "voted on", v, "(i like", vote, ")"
return
except KeyError:
#print "key error in vote"
return
#print "got consensus on votes"
vote = self.gossiper.set(
'/leader-election/master', v)
elif key == '/leader-election/master':
try:
vote = self.gossiper.get('/leader-election/master')
# check consensus:
for peer in self.gossiper.live_peers:
v = peer.get('/leader-election/master')
if v != vote:
return
except KeyError:
return
print self.name, "WE GOT A NEW MASTER", vote, reactor.seconds()
def peer_alive(self, peer):
print self.name, "thinks", peer.name, "is alive", peer.keys()
print peer.get('/leader-election/priority')
self._start_election()
_election_timeout = None
def _vote(self):
self._election_timeout = None
suggested_peer = self.gossiper.name
arrogance = self.gossiper.get(
'/leader-election/priority')
for peer in self.gossiper.live_peers:
p = peer.get('/leader-election/priority')
if p > arrogance:
suggested_peer = peer.name
arrogance = p
print self.name, "votes for", suggested_peer
try:
current_master = self.gossiper.get(
'/leader-election/vote')
if current_master == suggested_peer:
#print self.name, "no need to update master"
return
except KeyError:
pass
self.gossiper.set('/leader-election/vote', suggested_peer)
def _start_election(self):
if self._election_timeout is not None:
self._election_timeout.cancel()
self._election_timeout = reactor.callLater(5, self._vote)
def peer_dead(self, peer):
print self.name, "thinks", peer.name, "is dead"
self._start_election()
def peer_stable(self, peer):
print "stable", peer
members = []
for i in range(0, CNT):
participant = Participant('127.0.0.1:%d' % (9000+i))
gossiper = Gossiper(reactor, participant, '127.0.0.1')
gossiper.set('/leader-election/priority', i)
p = reactor.listenUDP(9000+i, gossiper)
members.append((gossiper, p, participant))
for i in range(1, CNT):
members[i][0].seed(['127.0.0.1:9000'])
seed = members[0][0]
def prop_test():
print "START PROP TEST"
print reactor.seconds()
seed.set('x', 'value')
pending = []
def kill_some():
if len(members) > (CNT / 2):
#i = random.randint(0, len(members) - 1)
i = len(members) - 1
gossiper, p, participant = members.pop(i)
print "killing", p.getHost()
p.stopListening()
#reactor.callLater(5, kill_some)
def test():
seed.set('test', 'value')
reactor.callLater(5, prop_test)
reactor.callLater(30, kill_some)
reactor.callWhenRunning(test)
reactor.run()