-
Notifications
You must be signed in to change notification settings - Fork 40
/
pregel.py
99 lines (83 loc) · 3.47 KB
/
pregel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""pregel.py is a python 2.6 module implementing a toy single-machine
version of Google's Pregel system for large-scale graph processing."""
import collections
import threading
class Vertex():
def __init__(self,id,value,out_vertices):
# This is mostly self-explanatory, but has a few quirks:
#
# self.id is included mainly because it's described in the
# Pregel paper. It is used briefly in the pagerank example,
# but not in any essential way, and I was tempted to omit it.
#
# Each vertex stores the current superstep number in
# self.superstep. It's arguably not wise to store many copies
# of global state in instance variables, but Pregel's
# synchronous nature lets us get away with it.
self.id = id
self.value = value
self.out_vertices = out_vertices
self.incoming_messages = []
self.outgoing_messages = []
self.active = True
self.superstep = 0
class Pregel():
def __init__(self,vertices,num_workers):
self.vertices = vertices
self.num_workers = num_workers
def run(self):
"""Runs the Pregel instance."""
self.partition = self.partition_vertices()
while self.check_active():
self.superstep()
self.redistribute_messages()
def partition_vertices(self):
"""Returns a dict with keys 0,...,self.num_workers-1
representing the worker threads. The corresponding values are
lists of vertices assigned to that worker."""
partition = collections.defaultdict(list)
for vertex in self.vertices:
partition[self.worker(vertex)].append(vertex)
return partition
def worker(self,vertex):
"""Returns the id of the worker that vertex is assigned to."""
return hash(vertex) % self.num_workers
def superstep(self):
"""Completes a single superstep.
Note that in this implementation, worker threads are spawned,
and then destroyed during each superstep. This creation and
destruction causes some overhead, and it would be better to
make the workers persistent, and to use a locking mechanism to
synchronize. The Pregel paper suggests that this is how
Google's Pregel implementation works."""
workers = []
for vertex_list in self.partition.values():
worker = Worker(vertex_list)
workers.append(worker)
worker.start()
for worker in workers:
worker.join()
def redistribute_messages(self):
"""Updates the message lists for all vertices."""
for vertex in self.vertices:
vertex.superstep +=1
vertex.incoming_messages = []
for vertex in self.vertices:
for (receiving_vertix,message) in vertex.outgoing_messages:
receiving_vertix.incoming_messages.append((vertex,message))
def check_active(self):
"""Returns True if there are any active vertices, and False
otherwise."""
return any([vertex.active for vertex in self.vertices])
class Worker(threading.Thread):
def __init__(self,vertices):
threading.Thread.__init__(self)
self.vertices = vertices
def run(self):
self.superstep()
def superstep(self):
"""Completes a single superstep for all the vertices in
self."""
for vertex in self.vertices:
if vertex.active:
vertex.update()