-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhelpers.py
173 lines (147 loc) · 5.4 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
import copy
def matshow(X, **kwargs):
with sns.axes_style("white"):
ax = plt.matshow(X, aspect='auto', cmap='gray', **kwargs);
return ax
def rle(x):
"""
Perform run length encoding on the numpy array x. Returns a tuple
of start indices, run lengths, and values for each run.
"""
# add infinity to beginning to x[0] always starts a run
dx = np.diff(np.insert(x.astype('float').flatten(), 0, np.inf))
starts = np.flatnonzero(dx)
# make sure stops always include last element
stops = np.append(starts[1:], np.size(x))
runlens = stops - starts
values = x[starts]
return starts, runlens, values
def mutual_information_matrix(X, Z):
"""
Given X, an observations x variables array of observed binary variables and
Z, variables x probabilities, p(z = 1), construct a matrix of
(normalized) mutual information scores. Output shape is
X.shape[1] x Z.shape[1]
"""
Nx = X.shape[1]
Nz = Z.shape[1]
mi_mat = np.empty((Nx, Nz))
for i in xrange(Nx):
for j in xrange(Nz):
mi_mat[i, j] = mi(X[:, i], Z[:, j])
return mi_mat
def mi(x, z):
"""
Calculate mutual information (normalized between 0 and 1) between
an observed binary sequence x and a sequence of posterior probabilities
z.
WARNING: Not particularly careful: can produce negative results.
"""
zm = np.mean(z)
xm = np.mean(x)
Hz = -zm * np.log(zm) - (1 - zm) * np.log1p(-zm)
Hx = -xm * np.log(xm) - (1 - xm) * np.log1p(-xm)
x0 = x == 0
x1 = x == 1
p1 = np.mean(x)
p0 = 1 - p1
pz0 = np.mean(z[x0])
pz1 = np.mean(z[x1])
Hzx = 0
if pz0 != 0 and pz0 != 1:
Hzx += -p0 * (pz0 * np.log(pz0) + (1 - pz0) * np.log1p(-pz0))
if pz1 != 0 and pz1 != 1:
Hzx += -p1 * (pz1 * np.log(pz1) + (1 - pz1) * np.log1p(-pz1))
return (Hz - Hzx) / np.sqrt(Hz * Hx)
def frames_to_times(df):
"""
Convert a dataframe with movie and frame columns to one with a single
unique time index.
"""
# make a dataframe of unique (movie, frame) pairs
t_index = df[['movie', 'frame']].drop_duplicates().sort_values(by=['movie', 'frame'])
t_index = t_index.reset_index(drop=True) # get rid of original index
t_index.index.name = 'time' # name integer index
t_index = t_index.reset_index() # make time a column
allframe = pd.merge(df, t_index).copy() # merge time with original df
allframe = allframe.drop(['movie', 'frame'], axis=1) # drop cols
return allframe
def get_movie_partition(df):
"""
Get beginning and ending row numbers corresponding to each movie. Return
a dataframe with columns (start, end).
"""
# make sure sort is right
tmpdf = df.sort_values(by=['movie', 'frame', 'unit'])
# get rid of old index, make a column of new, consecutive index
tmpdf = tmpdf.reset_index(drop=True).reset_index()
grp = tmpdf.groupby('movie')
starts = grp.first()['index']
ends = grp.last()['index']
return pd.DataFrame({'start': starts, 'end': ends})
def regularize_zeros(df):
"""
Given a DataFrame with 0s and NAs, regularize it so that it can be
put on a log scale.
"""
new_df = df.copy()
new_df[new_df == 0.0] = np.nan
new_mins = new_df.min()
new_df = new_df.fillna(new_mins)
return new_df
def gamma_from_hypers(mean_hyps, var_hyps, N=1e4):
"""
Given a pair of hyperparameters for the mean and a pair for the variance parameter,
draw N samples from the gamma distribution that results.
"""
th = stats.gamma.rvs(a=mean_hyps[0], scale=1./mean_hyps[1], size=N)
cc = stats.gamma.rvs(a=var_hyps[0], scale=1./var_hyps[1], size=N)
aa = cc
bb = cc * th
return stats.gamma.rvs(a=aa, scale=1./bb)
def lognormal_from_hypers(mu, scale, shape, rate, N=1e4):
"""
Draw N samples from a lognormal distribution whose (m, s) parameters
are drawn from a normal-gamma hyperprior with parameters
(mu, scale, shape, rate)
"""
tau = stats.gamma.rvs(a=shape, scale=1./rate, size=N)
sig = 1. / np.sqrt(tau)
s = 1. / np.sqrt(scale * tau)
m = stats.norm.rvs(loc=mu, scale=s, size=N)
x = stats.lognorm.rvs(scale=np.exp(m), s=sig, size=N)
return x
def jitter_array(arr, percent):
"""
Given an array, arr, return same array with each entry scaled by a
uniform percentage between +/- percent.
"""
arr = np.array(arr)
return arr * (1 + percent * (2 * np.random.rand(*arr.shape) - 1))
def jitter_inits(init_dict, percent_jitter):
"""
Make dict of initial guesses for posterior parameters.
percent_jitter is the variability of parameters around their values derived above
"""
inits = {}
keys = [k for k in init_dict if 'post' in k or 'init' in k]
for key in keys:
if key == 'z_init':
old_z = init_dict['z_init']
xi_mat = np.random.rand(*old_z.shape)
xi_mat = xi_mat.astype('float')
z_init = xi_mat / np.sum(xi_mat, axis=0, keepdims=True)
inits['z_init'] = z_init
elif key == 'zz_init':
inits['zz_init'] = np.random.rand(*init_dict['zz_init'].shape)
else:
inits[key] = jitter_array(init_dict[key], percent_jitter)
# hack this for now
new_inits = copy.deepcopy(init_dict)
new_inits.update(inits)
return new_inits