-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbiased_reporter.py
235 lines (196 loc) · 9.11 KB
/
biased_reporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#
# Class to report case data with periodic biases
#
import math
import numpy as np
import scipy.stats as ss
from datetime import datetime, timedelta
class Reporter():
"""Reporter class for epidemiological case data,
including biased reporters to replicate periodic
trends in real data."""
def __init__(self, case_data, start_date='01/01/2020', fatality_rate=0.02) -> None:
"""Instatinate with case data dataframe.
Parameters
----------
case_data : pd.Dataframe
At minimum, contains case data indexed by day
start_date : str
Date of initial infection, in the form 'DD/MM/YYYY'
fatality_rate : float
Optional factor to convert cases to deaths by uniform factor -
this functionality is a placeholder and is not accurate
"""
self.case_data = case_data
self.start_date = datetime.strptime(start_date, "%d/%m/%Y").date()
self.fatality_rate = fatality_rate
self.case_data['Date'] = self.case_data.apply(lambda row: self.start_date
+ timedelta(days=int(row.name)), axis=1)
def unbiased_report(self, output_path=None, df=None):
"""Save data to .csv format at 'output_path' compatible with
John Hopkins Covid Data, without modifying case data.
Parameters
----------
output_path : str
Path to output csv with reported data. If none,
returns the output dataframe.
df : pd.Dataframe
Input dataframe to report on - used for internal specification.
By default uses self.case_data (recommended for external use).
"""
if df is None:
df = self.case_data.copy()
df['Combined_Key'] = 'Synthetic Data'
if 'Confirmed' not in df.columns:
df.rename(columns={'Cases': 'Confirmed'}, inplace=True)
df['Deaths'] = df['Confirmed'].apply(lambda x: math.floor(x * self.fatality_rate
+ np.random.uniform()))
if output_path is not None:
df.to_csv(output_path)
else:
return df
def fixed_bias_report(self, output_path=None, bias=None, method = "scale"):
"""Adds a fixed weekly reporting bias to the case data.
Save data to .csv format at 'output_loc' compatible with
John Hopkins Covid Data, without modifying data.
Note: There may be slight discrepancies in total cases,
due to rounding errors when scaling.
Parameters
----------
output_path : str
Path to output csv with reported data. If none,
returns the output dataframe.
bias : dict
Dictionary mapping weekday names to reporting factors.
All values in the dictionary should average to unity,
but normalisation will enforce this. Can also be passed
as a list, where the first item corresponds to Monday.
method : str
Method used to bias case data. 'scale' corresponds to a deterministic
scaling, 'poisson' selects the scaled value from a poisson distribution,
and 'multinomial' reallocated cases based on a random multinomial
distribution.
"""
df = self.case_data.copy()
weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday',
'Friday', 'Saturday', 'Sunday']
df['Weekday'] = df['Date'].apply(lambda x: weekdays[x.weekday()])
if bias is None: # Default values from analysis
bias = [1.3, 1.2, 1, 1, 1, 0.9, 0.6]
if isinstance(bias, list):
assert len(bias) == 7, 'Expected seven values for weekday bias'
temp_dict = {}
for i, value in enumerate(bias):
temp_dict[weekdays[i]] = value
bias = temp_dict # Can now overwrite list values
elif isinstance(bias, dict):
assert set(weekdays) == set(bias.keys), 'Keys must match weekdays'
normalisation = sum(bias.values())
for v in bias.values():
v /= normalisation
if method == 'scale':
df['Confirmed'] = df.apply(lambda row: int(row['Cases']
* bias[row['Weekday']]), axis=1)
elif method == 'poisson':
df['Confirmed'] = df.apply(lambda row: ss.poisson.rvs(row['Cases'] * bias[row['Weekday']]), axis=1)
elif method == 'multinomial':
df['Confirmed'] = self._rolling_multinomial(df, bias)
elif method == 'dirichlet':
df['Confirmed'] = self._rolling_dirichlet(df, bias)
else:
raise ValueError(f"Unknown method argument {method} - valid arguments"
+ " are scale, poisson, multinomial and dirichlet")
df.rename(columns={'Cases': 'Ground Truth'}, inplace=True)
self.unbiased_report(output_path, df)
if output_path is None:
return df
def _rolling_multinomial(self, df, bias):
"""For each period of 7 days, redistributes the total number
of cases according to the bias values.
Parameters
df : pandas.Dataframe
Overall dataframe containing cases and date columes
bias : list
List of fixed weights to use for multinomial distribution
"""
data = list(df['Cases'].copy())
weights = [i / 7 for i in bias.values()]
week_count = len(data) // 7
first_day_index = df['Date'][0].weekday()
weights = [weights[(i+first_day_index) % 7] for i in range(7)]
for w in range(week_count):
week_data = data[7*w : 7*(w + 1)]
day_counts = np.random.multinomial(sum(week_data), weights)
data[7*w : 7*(w + 1)] = day_counts
return data
def _rolling_dirichlet(self, df, bias):
"""For each period of 7 days, redistributes the total number
of cases according to the bias values.
By sampling reporting factors for a week at a time (as opposed
to every day), we better preserve the overall case numbers.
Parameters
df : pandas.Dataframe
Overall dataframe containing cases and date columes
bias : list
List of fixed weights to use for Dirichlet distribution
"""
data = list(df['Cases'].copy())
weights = [i / 7 for i in bias.values()]
week_count = len(data) // 7
first_day_index = df['Date'][0].weekday()
weights = [weights[(i+first_day_index) % 7] for i in range(7)]
for w in range(week_count):
week_data = data[7*w : 7*(w + 1)]
rf = np.random.dirichlet(weights)
data[7*w : 7*(w + 1)] = [week_data[i] * (7 * rf[i]) for i in range(7)]
return data
def delay_distribution_report(self, params, output_path=None):
"""Assumes a separate delay distribution for each
day of the week. The distribution is a discretised
gamma, characterised by the mean and std, and capped at
6 days (i.e. no case/death reporting delay can be longer
than 6 days).
Parameters
----------
params : Dict
Dictionary of lists - each list is the set of
weekday values for a named distribution parameter.
output_path : str
Path to output csv with reported data. If none,
return the output dataframe.
"""
df = self.case_data.copy()
case_data = list(df['Cases'])
for i, value in enumerate(list(df['Cases'].copy())):
day_index = df['Date'][i].weekday()
delay_values = self._discrete_gamma_delay(value, params['shape'][day_index],
params['rate'][day_index])
# Update case-data with new counts
case_data[i] -= sum(delay_values)
for j, d_value in enumerate(delay_values):
try:
case_data[i + j] += d_value
except IndexError: # Occurs at end of dataset
continue
df['Confirmed'] = case_data
df.rename(columns={'Cases': 'Ground Truth'}, inplace=True)
self.unbiased_report(output_path, df)
if output_path is None:
return df
def _discrete_gamma_delay(self, value, shape, rate):
"""Takes a value of cases on a given day, and returns the
reported number of cases on each day in the next week (including
the original day) based on a discrete gamma distribution,
with stochastic rounding.
Parameters
----------
value : int
Number of cases on a given day to redistribute
shape : float
Shape parameter for gamma parameter used in redistribution
rate : float
Rate parameter for gamma parameter used in redistribution
"""
unnorm_dist = ss.gamma.pdf(x=list(range(7)), a=shape, scale=1/rate)
data_bias = (unnorm_dist / (sum(unnorm_dist))) * value
return [math.floor(x + np.random.uniform()) for x in data_bias]