-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
102 lines (82 loc) · 3.17 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import networkx as nx
import argparse
import pprint
import numpy as np
import random
import matplotlib.pyplot as plt
import tqdm
from networkx.algorithms import bipartite
import time
from GraphRicciCurvature.OllivierRicci import OllivierRicci
from GraphRicciCurvature.FormanRicci import FormanRicci
def get_ollivier_curvature(G):
orc = OllivierRicci(G, alpha = 0.5, verbose="INFO")
orc.compute_ricci_curvature()
# pprint.pprint([e for e in orc.G.edges.data()])
# print("The Ollivier-Ricci curvature of edge (0,1) is %f" % orc.G[0][1]["ricciCurvature"])
# print("The Ollivier-Ricci curvature of edge (0,1) is %f" % orc.G[1][2]["ricciCurvature"])
return sum([e[2]['ricciCurvature'] for e in orc.G.edges.data()]) / len(orc.G.edges())
def get_forman_curvature(G):
frc = FormanRicci(G)
frc.compute_ricci_curvature()
return sum([e[2]['formanCurvature'] for e in frc.G.edges.data()]) / len(frc.G.edges())
def sample(G, n_samples):
H = nx.to_scipy_sparse_matrix(G)
nodes = list(G)
nodes.sort()
n = H.shape[0]
curvature = []
max_iter = 10000
iter = 0
idx = 0
while idx < n_samples:
# if in max_iter we cannot sample a triangle check the diameter of the
# component, must be at least 3 to sample triangles
if iter == max_iter:
d = nx.algorithms.distance_measures.diameter(G)
if d < 3: return None
iter = iter + 1
b = random.randint(0, n-1)
c = random.randint(0, n-1)
if b == c: continue
path = nx.shortest_path(G, source=nodes[b], target=nodes[c])
if len(path) < 3: continue
middle = len(path) // 2
m = nodes.index(path[middle])
l_bc = len(path) - 1
# sample reference node
a = random.choice([l for l in list(range(n)) if l not in [m,b,c]])
path = nx.shortest_path(G, source=nodes[a], target=nodes[b])
l_ab = len(path) - 1
path = nx.shortest_path(G, source=nodes[a], target=nodes[c])
l_ac = len(path) - 1
path = nx.shortest_path(G, source=nodes[a], target=nodes[m])
l_am = len(path) - 1
idx = idx + 1
curv = (l_am**2 + l_bc**2 / 4 - (l_ab**2 + l_ac**2) / 2) / (2 * l_am)
curvature.append(curv)
return curvature
def sectional_curvature(G):
components = [G.subgraph(c) for c in nx.connected_components(G)]
nodes = [c.number_of_nodes()**3 for c in components]
total = np.sum(nodes)
weights = [n/total for n in nodes]
curvs = [0]
for idx,c in enumerate(components):
weight = weights[idx]
n_samples = int(1000 * weight)
if n_samples > 0 and c.number_of_nodes() > 3:
curv = sample(c, n_samples)
if curv is not None:
curvs.extend(curv)
return np.mean(curvs), total
start_time = time.time()
G = nx.gnm_random_graph(500, 500*499/2, directed=True)
U = G.to_undirected()
ollivier = get_ollivier_curvature(G)
print("ollivier: ", ollivier)
forman = get_forman_curvature(G)
print("forman: ", forman)
sectional, _ = sectional_curvature(U)
print("sectional: ", sectional)
print("--- %s seconds ---" % (time.time() - start_time))