-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqii_lib.py
246 lines (208 loc) · 8.69 KB
/
qii_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
""" Various QII related computations. """
import pandas as pd
import numpy
from builtins import range
RECORD_COUNTERFACTUALS = False
def intervene(X, features, x0):
""" Constant intervention """
X = numpy.array(X, copy=True)
x0 = x0.T
for f in features:
X[:, f] = x0[f]
return X
def causal_measure(clf, X, ep_state, f, x0):
""" Causal Measure with a constant intervention. """
c0 = clf.predict(x0)
X1 = intervene(X, ep_state, x0)
p1 = numpy.mean(1.*(clf.predict(X1) == c0))
X2 = intervene(X, ep_state + [f], x0)
p2 = numpy.mean(1.*(clf.predict(X2) == c0))
return p2 - p1
def random_intervene(X, cols):
""" Randomly intervene on a a set of columns of X. """
n = X.shape[0]
order = numpy.random.permutation(range(n))
X_int = numpy.array(X)
for c in cols:
X_int[:, c] = X_int[order, c]
return X_int
def random_intervene_point(X, cols, x0):
""" Randomly intervene on a a set of columns of x from X. """
n = X.shape[0]
order = numpy.random.permutation(range(n))
X_int = numpy.tile(x0, (n, 1))
for c in cols:
X_int[:, c] = X[order, c]
return X_int
def discrim(X, cls, sens):
not_sens = 1 - sens
y_pred = cls.predict(X)
discrim = numpy.abs(numpy.dot(y_pred, not_sens)/sum(not_sens)
- numpy.dot(y_pred, sens)/sum(sens))
return discrim
def discrim_ratio(X, cls, sens):
not_sens = 1 - sens
y_pred = cls.predict(X)
sens_rate = numpy.dot(y_pred, sens)/sum(sens)
not_sens_rate = numpy.dot(y_pred, not_sens)/sum(not_sens)
discrim = not_sens_rate/sens_rate
return discrim
def discrim_influence(dataset, cls, X_test, sens_test):
""" Measure influence on discrimination. """
discrim_inf = {}
f_columns = dataset.num_data.columns
sup_ind = dataset.sup_ind
for sf in sup_ind:
ls = [f_columns.get_loc(f) for f in sup_ind[sf]]
X_inter = random_intervene(numpy.array(X_test), ls)
discrim_inter = discrim(X_inter, cls, numpy.array(sens_test))
discrim_inf[sf] = discrim_inter
print ('Discrimination %s: %.3f' % (sf, discrim_inf[sf]))
return discrim_inf
def average_local_influence(dataset, cls, X):
average_local_inf = {}
counterfactuals = {}
iters = 10
f_columns = dataset.num_data.columns
sup_ind = dataset.sup_ind
y_pred = cls.predict(X)
for sf in sup_ind:
local_influence = numpy.zeros(y_pred.shape[0])
if RECORD_COUNTERFACTUALS:
counterfactuals[sf] = (numpy.tile(X, (iters, 1)), numpy.tile(X, (iters, 1)))
ls = [f_columns.get_loc(f) for f in sup_ind[sf]]
for i in range(0, iters):
X_inter = random_intervene(numpy.array(X), ls)
y_pred_inter = cls.predict(X_inter)
local_influence = local_influence + (y_pred == y_pred_inter)*1.
if RECORD_COUNTERFACTUALS:
n = X_inter.shape[0]
counterfactuals[sf][1][i*n:(i+1)*n] = X_inter
average_local_inf[sf] = 1 - (local_influence/iters).mean()
#print('Influence %s: %.3f' % (sf, average_local_inf[sf]))
return (average_local_inf, counterfactuals)
def unary_individual_influence(dataset, cls, x_ind, X):
y_pred = cls.predict(x_ind.reshape(1, -1))
average_local_inf = {}
counterfactuals = {}
iters = 1
f_columns = dataset.num_data.columns
sup_ind = dataset.sup_ind
for sf in sup_ind:
local_influence = numpy.zeros(y_pred.shape[0])
if RECORD_COUNTERFACTUALS:
counterfactuals[sf] = (numpy.tile(X, (iters, 1)), numpy.tile(X, (iters, 1)))
ls = [f_columns.get_loc(f) for f in sup_ind[sf]]
for i in range(0, iters):
X_inter = random_intervene_point(numpy.array(X), ls, x_ind)
y_pred_inter = cls.predict(X_inter)
local_influence = local_influence + (y_pred == y_pred_inter)*1.
if RECORD_COUNTERFACTUALS:
n = X_inter.shape[0]
counterfactuals[sf][1][i*n:(i+1)*n] = X_inter
average_local_inf[sf] = 1 - (local_influence/iters).mean()
#print('Influence %s: %.3f' % (sf, average_local_inf[sf]))
return (average_local_inf, counterfactuals)
def shapley_influence(dataset, cls, x_individual, X_test):
p_samples = 600
s_samples = 600
def intervene(S_feature, X_values, X_inter):
for f in S_feature:
X_values[:, f] = X_inter[:, f]
def P(X_values):
return ((cls.predict(X_values) == y0) * 1.).mean()
y0 = cls.predict(x_individual)
b = numpy.random.randint(0, X_test.shape[0], p_samples)
X_sample = numpy.array(X_test.ix[b])
f_columns = dataset.num_data.columns
sup_ind = dataset.sup_ind
super_indices = list(sup_ind.keys())
# translate into integer indices
ls = {}
for si in super_indices:
ls[si] = [f_columns.get_loc(f) for f in sup_ind[si]]
shapley = dict.fromkeys(super_indices, 0)
if RECORD_COUNTERFACTUALS:
base = numpy.tile(x_individual, (2*p_samples*s_samples, 1))
#counterfactuals = dict([(sf, (base, numpy.zeros(p_samples*s_samples*2, X_test.shape[1])))
# for sf in dataset.sup_ind.keys()])
counterfactuals = dict([(sf, (base,
numpy.zeros((p_samples*s_samples*2, X_test.shape[1]))))
for sf in dataset.sup_ind.keys()])
else:
counterfactuals = {}
for sample in range(0, s_samples):
perm = numpy.random.permutation(len(super_indices))
# X_data is x_individual intervened with some features from X_sample
# Invariant: X_data will be x_individual intervened with Union of si[perm[ 0 ... i-1]]
X_data = numpy.tile(x_individual, (p_samples, 1))
# p for X_data, == 1.0 trivially at start.
p_S_si = 1.
for i in range(0, len(super_indices)):
# Choose a random subset and get string indices by flattening
# excluding si
si = super_indices[perm[i]]
if RECORD_COUNTERFACTUALS:
start_ind = 2*sample*p_samples
mid_ind = (2*sample+1)*p_samples
counterfactuals[si][1][start_ind:mid_ind] = X_data
#repeat x_individual_rep
intervene(ls[si], X_data, X_sample)
if RECORD_COUNTERFACTUALS:
mid_ind = (2*sample+1)*p_samples
end_ind = 2*(sample+1)*p_samples
counterfactuals[si][1][mid_ind:end_ind] = X_data
p_S = P(X_data)
shapley[si] = shapley[si] - (p_S - p_S_si)/s_samples
p_S_si = p_S
return (shapley, counterfactuals)
def banzhaf_influence(dataset, cls, x_individual, X_test):
p_samples = 600
s_samples = 600
def v(S, x, X_inter):
x_rep = numpy.tile(x, (p_samples, 1))
for f in S:
x_rep[:, f] = X_inter[:, f]
p = ((cls.predict(x_rep) == y0)*1.).mean()
return p
#min_i = numpy.argmin(sum_local_influence)
y0 = cls.predict(x_individual)
b = numpy.random.randint(0, X_test.shape[0], p_samples)
X_sample = numpy.array(X_test.ix[b])
f_columns = dataset.num_data.columns
sup_ind = dataset.sup_ind
super_indices = dataset.sup_ind.keys()
banzhaf = dict.fromkeys(super_indices, 0)
for sample in range(0, s_samples):
r = numpy.random.ranf(len(super_indices))
S = [super_indices[i] for i in xrange(0, len(super_indices)) if r[i] > 0.5]
for si in super_indices:
# Choose a random subset and get string indices by flattening
# excluding si
S_m_si = sum([sup_ind[x] for x in S if x != si], [])
#translate into intiger indices
ls_m_si = [f_columns.get_loc(f) for f in S_m_si]
#repeat x_individual_rep
p_S = v(ls_m_si, x_individual, X_sample)
#also intervene on s_i
ls_si = [f_columns.get_loc(f) for f in sup_ind[si]]
p_S_si = v(ls_m_si + ls_si, x_individual, X_sample)
banzhaf[si] = banzhaf[si] - (p_S - p_S_si)/s_samples
return banzhaf
def analyze_outliers(counterfactuals, out_cls, cls):
outlier_fracs = {}
new_outlier_fracs = {}
qii = {}
for sf, pairs in counterfactuals.iteritems():
X = pairs[0]
X_cf = pairs[1]
outs_X = out_cls.predict(X) == -1
outs_X_cf = out_cls.predict(X_cf) == -1
outlier_fracs[sf] = numpy.mean(outs_X_cf)
lnot = numpy.logical_not
land = numpy.logical_and
old_outlier_frac = numpy.mean(lnot(outs_X))
new_outlier_fracs[sf] = numpy.mean(land(lnot(outs_X), outs_X_cf))/old_outlier_frac
qii = numpy.mean(cls.predict(X) != cls.predict(X_cf))
print ('QII %s %.3f' % (sf, qii))
return (outlier_fracs, new_outlier_fracs)