-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathestimator.py
329 lines (268 loc) · 12.1 KB
/
estimator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import os
import time
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Dict, Any
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
import pandas as pd
import pytz
from optuna.storages import RDBStorage
from evaluation_utils import calculate_opt_metrics
class EstimatorManager:
"""
Manages the storage and setup for estimator tuning results.
Attributes:
results_dir (str): Directory where results are stored.
db_path (str): Path to the SQLite database for Optuna storage.
db_url (str): URL for the SQLite database.
storage (RDBStorage): Optuna storage object for managing study results. If None, no database is used.
"""
def __init__(self, results_dir: str, use_db=True):
"""
Initializes the EstimatorManager with the given results directory.
Args:
results_dir (str): Directory where results are stored.
use_db (bool): Whether to use a sqlite database for Optuna storage.
"""
self.results_dir = results_dir
self.use_db = use_db
if self.use_db:
self.db_path = os.path.join(results_dir, 'tuning', 'optuna_master.db')
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.db_url = f"sqlite:///{self.db_path}"
self.storage = RDBStorage(self.db_url)
def get_storage(self):
"""
Returns the Optuna storage object.
Returns:
RDBStorage: Optuna storage object for managing study results.
"""
return self.storage if self.use_db else None
class Estimator(ABC):
"""
Abstract base class for estimators.
Attributes:
name (str): The name of the estimator.
manager (EstimatorManager): The manager responsible for handling estimator-related operations.
hyperparameters (dict): Dictionary to store hyperparameters.
last_optimization_date (datetime): The date when the estimator was last optimized.
optimization_frequency (timedelta): Frequency at which the estimator should be optimized.
performance_threshold (float): Performance threshold for triggering optimization.
eval_metric (str): Evaluation metric used for optimization.
best_performance (float): Best performance achieved by the estimator.
utc (pytz.UTC): UTC timezone object for handling datetime localization.
fit_time (float): Time taken to fit the estimator.
predict_time (float): Time taken to make predictions.
optimize_time (float): Time taken to optimize the estimator.
"""
def __init__(self, name: str, results_dir: str, use_db: bool = False, required_history: int = 0,
min_opt_days: int = 1):
"""
Initializes the Estimator with the given name and manager.
Args:
name (str): The name of the estimator.
results_dir (str): Directory where results are stored.
use_db (bool): Whether to use a sqlite database for Optuna storage.
required_history (int): Number of days of history required for the estimator (usually to construct lags).
min_opt_days (int): Minimum number of days before the estimator can be optimized.
"""
self.name = name
self.manager = EstimatorManager(results_dir, use_db)
self.hyperparameters = {}
self.last_optimization_date = None
self.required_history = timedelta(days=required_history)
self.optimization_frequency = timedelta(days=30)
self.optimization_wait = timedelta(days=7)
self.min_opt_days = min_opt_days
self.n_trials = 50
self.performance_threshold = 0.1
self.eval_metric = "MAE"
self.best_performance = float('inf')
self.utc = pytz.UTC
self.fit_time = 0
self.predict_time = 0
self.optimize_time = 0
@abstractmethod
def set_model_params(self, **params):
"""
Set the parameters of the model.
This method should be implemented by each specific estimator.
"""
pass
@abstractmethod
def fit(self, train_data: Dict[str, pd.DataFrame]):
"""
Abstract method to fit the estimator to the training data.
Args:
train_data (Dict[str, pd.DataFrame]): The training data.
"""
pass
@abstractmethod
def predict(self, test_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Abstract method to make predictions on the test data.
Args:
test_data (Dict[str, pd.DataFrame]): The test data.
Returns:
pd.DataFrame: The predictions.
"""
pass
def timed_fit(self, train_data: Dict[str, pd.DataFrame]):
"""
Fits the estimator to the training data and records the time taken.
Args:
train_data (Dict[str, pd.DataFrame]): The training data.
"""
start_time = time.time()
self.fit(train_data)
self.fit_time = time.time() - start_time
def timed_predict(self, test_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Makes predictions on the test data and records the time taken.
Args:
test_data (Dict[str, pd.DataFrame]): The test data.
Returns:
pd.DataFrame: The predictions.
"""
start_time = time.time()
predictions = self.predict(test_data)
self.predict_time = time.time() - start_time
return predictions
@abstractmethod
def split_data(self, train_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, pd.DataFrame]]:
"""
Split the training data into training and validation sets.
Args:
train_data (Dict[str, pd.DataFrame]): Dictionary containing raw training data.
Returns:
Dict[str, Dict[str, pd.DataFrame]]: Dictionary with 'train' and 'valid' keys, each containing a data dictionary.
"""
pass
def optimize(self, train_data: Dict[str, pd.DataFrame], current_date: datetime):
"""
Optimizes the estimator using the given training data and number of trials.
Args:
train_data (Dict[str, pd.DataFrame]): The training data.
current_date (datetime, optional): The current date.
"""
storage = self.manager.get_storage() if self.manager.use_db else None
study_name = f"{self.name}_optimization_{current_date.strftime('%Y-%m-%d')}"
start_time = time.time()
split_data = self.split_data(train_data)
train_subset = split_data['train']
valid_subset = split_data['valid']
prepared_train_data = self.prepare_data(train_subset, is_train=True)
prepared_valid_data = self.prepare_data(valid_subset, is_train=True) # val subset can be larger than 1 day
def objective(trial):
params = self.define_hyperparameter_space(trial)
self.hyperparameters = params
self.set_model_params(**params) # Set model parameters
self.fit(prepared_train_data)
predictions = self.predict(prepared_valid_data)
len_predictions = len(predictions)
actuals = valid_subset['day_ahead_prices'].values[-len_predictions:]
preds = predictions.values
metrics = calculate_opt_metrics(preds, actuals)
# Compute custom metric if implemented
custom_metric = self.compute_custom_metric(actuals, preds)
if custom_metric is not None:
metrics['custom_metric'] = custom_metric
return metrics[self.eval_metric]
if self.manager.use_db:
study = optuna.create_study(direction="minimize", storage=storage, study_name=study_name,
load_if_exists=True)
else:
study = optuna.create_study(direction="minimize", study_name=study_name)
study.optimize(objective, n_trials=self.n_trials)
if study.best_params:
self.set_model_params(**study.best_params)
self.last_optimization_date = current_date
optimization_time = time.time() - start_time
# Log
if self.manager.use_db:
study.set_user_attr('estimator_name', self.name)
study.set_user_attr('study_creation_time', datetime.now().isoformat())
study.set_user_attr('optimization_datetime', current_date.isoformat())
study.set_user_attr('optimization_date', current_date.strftime('%Y-%m-%d'))
study.set_user_attr('optimization_time', optimization_time)
study.set_user_attr('best_params', study.best_params)
study.set_user_attr('best_value', study.best_value)
self.optimize_time = optimization_time
def get_execution_times(self):
"""
Returns the execution times for fitting, predicting, and optimizing.
Returns:
dict: Dictionary containing the execution times.
"""
optimization_time = self.optimize_time
self.optimize_time = 0 # reset to avoid double counting
return {
'fit_time': self.fit_time,
'predict_time': self.predict_time,
'optimize_time': optimization_time,
'total_time': self.fit_time + self.predict_time + self.optimize_time
}
@abstractmethod
def prepare_data(self, data: Dict[str, pd.DataFrame], is_train: bool) -> Dict[str, Any]:
"""
Prepare the data for fitting or prediction.
Args:
data (Dict[str, pd.DataFrame]): Dictionary containing raw data.
is_train (bool): Whether the data is for training or prediction.
Returns:
Dict[str, Any]: Dictionary containing prepared data.
"""
pass
@abstractmethod
def define_hyperparameter_space(self, trial: optuna.Trial) -> Dict[str, Any]:
"""
Define the hyperparameter space for optimization.
Args:
trial (optuna.Trial): Optuna trial object.
Returns:
Dict[str, Any]: Dictionary of hyperparameters.
"""
pass
def should_optimize(self, train_start: datetime, current_date: datetime, recent_performance: float) -> bool:
"""
Determines whether the estimator should be optimized based on the current date and recent performance.
Args:
current_date (datetime): The current date.
recent_performance (float): The recent performance metric of the estimator.
Returns:
bool: True if the estimator should be optimized, False otherwise.
"""
days_since_first_train = (current_date - train_start).days
if days_since_first_train < self.min_opt_days:
return False
if self.last_optimization_date is None:
return True
time_since_last_optimization = current_date - self.last_optimization_date
if time_since_last_optimization < self.optimization_wait:
return False
time_condition = time_since_last_optimization >= self.optimization_frequency
performance_condition = recent_performance > (1 + self.performance_threshold) * self.best_performance
return time_condition or performance_condition
def set_optimization_params(self, frequency: timedelta, threshold: float, metric: str):
"""
Sets the optimization parameters for the estimator.
Args:
frequency (timedelta): Frequency at which the estimator should be optimized.
threshold (float): Performance threshold for triggering optimization.
metric (str): Evaluation metric used for optimization.
"""
self.optimization_frequency = frequency
self.performance_threshold = threshold
self.eval_metric = metric
def compute_custom_metric(self, y_true, y_pred):
"""
Compute a custom metric for the model.
Subclasses can override this method to implement custom metrics.
Args:
y_true: True target values
y_pred: Predicted target values
Returns:
float or None: The computed custom metric, or None if not implemented
"""
return None