-
Notifications
You must be signed in to change notification settings - Fork 1
/
bdd-optimized.py
118 lines (79 loc) · 2.48 KB
/
bdd-optimized.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from typing import List
from time import time
from graph import Graph, Vertex, Edge
from dd.autoref import BDD, Function
def example_complex():
g = Graph()
network = g.create_vertex(0, "Network")
a = g.create_vertex(100, "A")
b = g.create_vertex(100, "B")
c = g.create_vertex(100, "C")
d = g.create_vertex(100, "D")
e = g.create_vertex(100, "Target")
network.create_edge(a, 0.0)
network.create_edge(b, 0.0)
a.create_edge(b, 0.5)
a.create_edge(c, 0.5)
b.create_edge(d, 0.5)
c.create_edge(d, 0.5)
c.create_edge(e, 0.5)
d.create_edge(e, 0.5)
return g
start = time()
graph = example_complex()
network = graph.get_vertex("Network")
target = graph.get_vertex("Target")
bdd = BDD()
edges = {}
for e in list(set([e for v in graph.vertices for e in v.edges])):
edges[str(e)] = e
bdd.declare(str(e))
def traverse(v: Vertex, visited: List[Vertex]) -> Function:
res = None
for e in v.edges:
next = e.get_other(v)
if next in visited:
continue
term = bdd.var(str(e))
expr = traverse(next, visited + [v])
expr = expr.implies(term) if expr else term
res = res | expr if res else expr
return res
top = traverse(target, [network])
def terminate(failed: List[Edge]):
# Compute the set of directly impacted vertices
disabled = set()
queue = [target]
while len(queue) > 0:
v = queue.pop()
if v in disabled:
continue
disabled.add(v)
for e in v.edges:
if e not in failed:
continue
next = e.get_other(v)
queue.append(next)
cutoff = set(graph.vertices)
queue = [network]
while len(queue) > 0:
v = queue.pop()
if v not in cutoff:
continue
if v in disabled:
continue
cutoff.remove(v)
for n in v.neighbours:
queue.append(n)
return sum([v.households for v in cutoff])
def evaluate(top: Function, failed: List[Edge]):
edge = edges[top.var]
hi = evaluate(top.high, failed + [edge]) + terminate(failed + [edge]) if top.high.var else terminate(failed + [edge])
lo = evaluate(top.low, failed) if top.low.var else terminate(failed)
return edge.weight * hi + (1.0 - edge.weight) * lo
influence = evaluate(top, [])
print("Total influence: {}".format(influence))
end = time()
print("That took {}s".format(end - start))
bdd.collect_garbage()
bdd.dump('bdd.pdf', roots=[top])