-
Notifications
You must be signed in to change notification settings - Fork 3
/
hyperbelle_tree_yy_01.py
executable file
·217 lines (159 loc) · 7.67 KB
/
hyperbelle_tree_yy_01.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
from collections import defaultdict
import numpy as np
from sklearn.base import BaseEstimator
from scipy import optimize
layer_id = 0
phi_id = 1
x_id = 2
y_id = 3
number_id = 4
# SORRY! All comments are still missing...
def funk(x, y, p = [], formula = "p[0] + ( (p[1]*x) + ( p[2]*(x**2) ) )"):
return eval(formula)
def calc_R(x, y):
return funk(x,y,[],"np.sqrt(x**2+y**2)")
class Clusterer(BaseEstimator):
def __init__(self, cut=0.0001, duplicate_cut=0.1):
self.cut = cut
self.duplicate_cut = duplicate_cut
self.layers = list(range(9))
self.hit_masks_grouped_by_layer = {}
self.not_used_mask = None
self.weights = np.zeros((8, 50))
self.dphiRange = 0.1
def fit(self, X, y):
events = y[:, 0]
for event in np.unique(events):
event_indices = events == event
X_event = X[event_indices]
pixelx = X_event[:, 3]
pixely = X_event[:, 4]
phi = np.arctan2(pixely, pixelx)
layer = X_event[:, 1]
particles = y[event_indices][:, 1]
for particle in np.unique(particles):
hits_particle = particles == particle
hits_layer = layer[hits_particle]
hits_phi = phi[hits_particle]
sort = np.argsort(hits_layer)
dlayer = hits_layer[sort][1:] - hits_layer[sort][:-1]
use = dlayer == 1
hits_layer_end = hits_layer[sort][:-1][use]
dphi = self.abs_phi_dist(hits_phi[sort][1:], hits_phi[sort][:-1])[use]
dphi_bin = np.round(dphi / self.dphiRange * self.weights.shape[1])
in_range = dphi_bin < self.weights.shape[1]
self.weights[hits_layer_end[in_range].astype('int'), dphi_bin[in_range].astype('int')] += 1
for layer in range(self.weights.shape[0]):
#self.weights[layer] /= np.sum(self.weights[layer])
self.weights[layer] /= np.max(self.weights[layer])
@staticmethod
def fit_track(track_list, X_event):
if len(track_list) < 3:
return 0, 0, 0
x_values = X_event[track_list, x_id]
y_values = X_event[track_list, y_id]
x_m = np.mean(x_values)
y_m = np.mean(y_values)
# def calc_R(xc, yc):
# return np.sqrt((x_values - xc) ** 2 + (y_values - yc) ** 2)
def f_2(c):
Ri = calc_R(x_values-c[0],y_values-c[1])
return Ri - Ri.mean()
center_estimate = x_m, y_m
center_2, ier = optimize.leastsq(f_2, center_estimate)
# print center_2.shape
# import pdb; pdb.set_trace()
xc_2, yc_2 = center_2
Ri_2 = calc_R(x_values-xc_2, y_values-yc_2)
R_2 = Ri_2.mean()
residu_2 = sum((Ri_2 - R_2) ** 2)
phi_values = np.arctan2(y_values - yc_2, x_values - xc_2)
phi_2 = (np.arctan2(yc_2, xc_2) + np.sign(np.mean(phi_values)) * np.pi)
return R_2, phi_2, residu_2
@staticmethod
def abs_phi_dist(phi_1, phi_2):
dphi = abs(phi_1 - phi_2)
dphi = (dphi > np.pi) * (2 * np.pi - dphi) + (dphi <= np.pi) * dphi
return dphi
@staticmethod
def get_weight(phi_1, phi_2, dphiRange, weights):
dphi = Clusterer.abs_phi_dist(phi_1, phi_2)
dphi_bin = np.round(dphi / dphiRange * len(weights))
in_range = dphi_bin < len(weights)
w = -np.ones(len(phi_2))
w[in_range] = weights[dphi_bin[in_range].astype('int')]
return w
def get_quality(self, track, X_event):
R, phi, residuum = self.fit_track(track, X_event)
return -residuum
def walk(self, hit, track, X_event):
# abort criteria
if hit[layer_id] == 0:
yield track
return
unused_hits_on_next_layer_mask = self.hit_masks_grouped_by_layer[hit[layer_id] - 1] & self.not_used_mask
unused_hits_on_next_layer = X_event[unused_hits_on_next_layer_mask]
if np.all(~unused_hits_on_next_layer_mask):
yield track
return
weights = self.get_weight(hit[phi_id], unused_hits_on_next_layer[:, phi_id], self.dphiRange, self.weights[int(hit[layer_id] - 1)])
maximal_weight = np.max(weights)
if maximal_weight < self.cut:
yield track
return
possible_next_hits_mask = maximal_weight - weights < self.duplicate_cut
for possible_next_hit in unused_hits_on_next_layer[possible_next_hits_mask]:
for track_candidate in self.walk(possible_next_hit,
track + [int(possible_next_hit[number_id])],
X_event):
yield track_candidate
def extrapolate(self, track_list, X_event):
unfinished_tracks_end = defaultdict(list)
unfinished_tracks_begin = defaultdict(list)
for track in track_list:
if len(track) == len(self.layers):
continue
hits_of_track = X_event[track]
last_layer = max(hits_of_track[:, layer_id])
first_layer = min(hits_of_track[:, layer_id])
if last_layer != len(self.layers) - 1:
unfinished_tracks_end[last_layer].append(track)
elif first_layer != 0:
unfinished_tracks_begin[first_layer].append(track)
for last_layer, unfinished_tracks in unfinished_tracks_end.items():
for unfinished_track in unfinished_tracks:
other_unfinished_tracks = unfinished_tracks_begin[last_layer + 2]
if len(other_unfinished_tracks) == 0:
continue
best_unfinished_track = max(other_unfinished_tracks, key=lambda track: self.get_quality(unfinished_track + track, X_event))
unfinished_track += best_unfinished_track
del other_unfinished_tracks[other_unfinished_tracks.index(best_unfinished_track)]
def predict_single_event(self, X_event):
# Attention! We are redefining the iphi column here, as we do not need it
X_event[:, phi_id] = np.arctan2(X_event[:, y_id], X_event[:, x_id])
# Add a another column to store the hit ids
number_of_hits = len(X_event)
X_event = np.concatenate([X_event, np.reshape(np.arange(number_of_hits), (number_of_hits, 1))], axis=1)
for layer in self.layers:
self.hit_masks_grouped_by_layer[layer] = X_event[:, layer_id] == layer
self.not_used_mask = np.ones(len(X_event)).astype("bool")
labels = -1 * np.ones(len(X_event))
track_list = []
for layer in reversed(self.layers):
while True:
unused_mask_in_this_layer = self.hit_masks_grouped_by_layer[layer] & self.not_used_mask
if not np.any(unused_mask_in_this_layer):
break
start_hit = X_event[unused_mask_in_this_layer][0]
found_track_list = list(self.walk(start_hit, [int(start_hit[number_id])], X_event))
if len(found_track_list) != 1:
best_track = max(found_track_list, key=lambda track: self.get_quality(track, X_event))
else:
best_track = found_track_list[0]
# Store best track
self.not_used_mask[best_track] = False
track_list.append(best_track)
self.extrapolate(track_list, X_event)
for i, track in enumerate(track_list):
labels[track] = i
return labels