diff --git a/python/statsforecast/arima.py b/python/statsforecast/arima.py index b018eea87..3b732d9de 100644 --- a/python/statsforecast/arima.py +++ b/python/statsforecast/arima.py @@ -1,5 +1,14 @@ -__all__ = ['predict_arima', 'arima_string', 'forecast_arima', 'fitted_arima', 'auto_arima_f', 'print_statsforecast_ARIMA', - 'ARIMASummary', 'AutoARIMA'] +__all__ = [ + "predict_arima", + "arima_string", + "forecast_arima", + "fitted_arima", + "auto_arima_f", + "print_statsforecast_ARIMA", + "ARIMASummary", + "AutoARIMA", + "simulate_arima", +] import math @@ -50,7 +59,6 @@ def getQ0(phi, theta): def arima_transpar(params_in, arma, trans): - # TODO check trans=True results return _arima.arima_transpar(params_in, arma, trans) @@ -668,8 +676,6 @@ def predict_arima(model, n_ahead, newxreg=None, se_fit=True): narma = sum(arma[:4]) if len(coefs) > narma: - # check intercept - # i think xreg is unused if ncoefs[narma] == "intercept": intercept = np.ones(n_ahead, dtype=np.float64).reshape(-1, 1) if newxreg is None: @@ -1225,6 +1231,136 @@ def forecast_arima( return ans +def simulate_arima( + model, + h, + n_paths, + xreg=None, + seed=None, + error_distribution="normal", + error_params=None, +): + """ + Simulate future paths from a fitted ARIMA model. + + Parameters + ---------- + model : dict + Fitted ARIMA model dictionary. + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + xreg : np.ndarray, optional + Future exogenous regressors of shape (h, n_x). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + if model.get("constant", False): + return np.full((n_paths, h), model["x"][-1]) + + # Set up random generator + rng = np.random.default_rng(seed) + if seed is not None: + np.random.seed(seed) # For backward compatibility + + # Regression component (similar to forecast_arima and predict_arima) + use_drift = "drift" in model["coef"].keys() + if use_drift: + n = len(model["x"]) + drift = np.arange(1, h + 1, dtype=np.float64).reshape(-1, 1) + drift += n + if xreg is not None: + xreg = np.concatenate([drift, xreg], axis=1) + else: + xreg = drift + arma = model["arma"] + ncoefs = list(model["coef"].keys()) + coefs = np.array(list(model["coef"].values())) + narma = sum(arma[:4]) + + newxreg = xreg + if len(coefs) > narma: + if ncoefs[narma] == "intercept": + intercept = np.ones(h, dtype=np.float64).reshape(-1, 1) + if newxreg is None: + newxreg = intercept + else: + newxreg = np.concatenate([intercept, newxreg], axis=1) + + if newxreg is not None: + if narma == 0: + needed_cols = len(coefs) + reg_coefs = coefs + else: + needed_cols = len(coefs) - narma + reg_coefs = coefs[narma:] + + if newxreg.shape[1] != needed_cols: + if newxreg.shape[1] > needed_cols: + xm = np.matmul(newxreg[:, :needed_cols], reg_coefs) + else: + raise Exception( + f"Number of regressors ({newxreg.shape[1]}) does not match fitted model ({needed_cols})" + ) + else: + xm = np.matmul(newxreg, reg_coefs) + xm = xm.flatten() + else: + xm = 0 + else: + xm = 0 + + # Kalman filter state-space components + Z, a_init, T, V, obs_h = [model["model"][var] for var in ["Z", "a", "T", "V", "h"]] + sigma2 = model["sigma2"] + + paths = np.empty((n_paths, h)) + + # V can be singular, so we use SVD-based sampling for state transition noise + u, s, _ = np.linalg.svd(V * sigma2) + state_noise_std = u * np.sqrt(s) + + obs_noise_std = np.sqrt(obs_h * sigma2) + residuals = model.get("residuals", None) + + for i in range(n_paths): + a = a_init.copy() + for t in range(h): + # State transition noise (Gaussian for state-space consistency) + state_noise = rng.normal(size=a.size) + a = T @ a + state_noise_std @ state_noise + + # Observation noise with selected distribution + obs_noise = sample_errors( + size=1, + sigma=obs_noise_std, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + )[0] + paths[i, t] = (Z @ a) + obs_noise + + if not isinstance(xm, int): + paths += xm + + return paths + + + def fitted_arima(model, h=1): """Returns h-step forecasts for the data used in fitting the model. diff --git a/python/statsforecast/ces.py b/python/statsforecast/ces.py index 176d9d82e..d43ac9143 100644 --- a/python/statsforecast/ces.py +++ b/python/statsforecast/ces.py @@ -1,4 +1,4 @@ -__all__ = ['ces_target_fn'] +__all__ = ["auto_ces", "simulate_ces"] import math @@ -802,3 +802,77 @@ def forward_ces(fitted_model, y): beta_0=beta_0, beta_1=beta_1, ) + + +def simulate_ces( + model, + h, + n_paths, + seed=None, + error_distribution="normal", + error_params=None, +): + """ + Simulate future paths from a fitted CES model. + + Parameters + ---------- + model : dict + Fitted CES model dictionary. + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + # Set up random generator + rng = np.random.default_rng(seed) + if seed is not None: + np.random.seed(seed) + + y_path = np.zeros([n_paths, h]) + + sigma = np.sqrt(model["sigma2"]) + states_shape = model["states"].shape + + # Get residuals for bootstrap if needed + residuals = model.get("residuals", None) + + for k in range(n_paths): + # Sample state noise from specified distribution + e = sample_errors( + size=states_shape, + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + states = model["states"] + fcsts = np.zeros(h, dtype=np.float32) + cesforecast( + states=states + e, + n=model["n"], + m=model["m"], + season=switch_ces(model["seasontype"]), + h=h, + f=fcsts, + **model["par"], + ) + y_path[k,] = fcsts + + return y_path + diff --git a/python/statsforecast/core.py b/python/statsforecast/core.py index 373e7b9d8..534484862 100644 --- a/python/statsforecast/core.py +++ b/python/statsforecast/core.py @@ -1,4 +1,4 @@ -__all__ = ['StatsForecast'] +__all__ = ["StatsForecast"] import datetime as dt @@ -463,6 +463,71 @@ def _single_threaded_cross_validation( target_col=target_col, ) + def simulate( + self, + h, + n_paths, + models, + X=None, + seed=None, + seeds=None, + error_distribution="normal", + error_params=None, + ): + n_groups = self.n_groups + n_models = len(models) + if seeds is None and seed is not None: + np.random.seed(seed) + seeds = np.random.randint(0, 2**31, size=n_groups) + out = np.full( + (n_groups * n_paths * h, n_models), fill_value=np.nan, dtype=self.data.dtype + ) + for i_model, model in enumerate(models): + for i, grp in enumerate(self): + y = grp[:, 0] if grp.ndim == 2 else grp + X_in = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None + if X is not None: + X_future = X[i] + else: + X_future = None + + group_seed = seeds[i] if seeds is not None else None + paths = model.simulate( + h=h, + n_paths=n_paths, + y=y, + X=X_in, + X_future=X_future, + seed=group_seed, + error_distribution=error_distribution, + error_params=error_params, + ) + out[i * n_paths * h : (i + 1) * n_paths * h, i_model] = paths.flatten() + return {"forecasts": out, "cols": [repr(m) for m in models]} + + @_controller.wrap(limits=1) + def _single_threaded_simulate( + self, + h, + n_paths, + models, + X=None, + seed=None, + seeds=None, + error_distribution="normal", + error_params=None, + ): + return self.simulate( + h=h, + n_paths=n_paths, + models=models, + X=X, + seed=seed, + seeds=seeds, + error_distribution=error_distribution, + error_params=error_params, + ) + def _get_n_jobs(n_groups, n_jobs): if n_jobs == -1 or (n_jobs is None): @@ -517,7 +582,6 @@ def __init__( execution (when n_jobs=1). """ - # TODO @fede: needed for residuals, think about it later self.models = models self._validate_model_names() self.freq = freq @@ -871,6 +935,132 @@ def forecast( self.forecast_times_ = res_fcsts["times"] return fcsts_df + def _simulate_parallel( + self, h, n_paths, X, seed, error_distribution="normal", error_params=None + ): + gas, Xs = self._get_gas_Xs(X=X, tasks_per_job=1) + + # Pre-calculate seeds for each group to ensure consistency across models + # and different seeds across groups even in parallel + if seed is not None: + np.random.seed(seed) + all_seeds = np.random.randint(0, 2**31, size=self.ga.n_groups) + else: + all_seeds = None + + results = [None] * len(gas) + cumsum_groups = np.cumsum([0] + [ga.n_groups for ga in gas]) + + with ProcessPoolExecutor(self.n_jobs) as executor: + future2pos = { + executor.submit( + ga._single_threaded_simulate, + h=h, + n_paths=n_paths, + models=self.models, + X=X, + seed=None, # Already handled by passing seeds + seeds=all_seeds[cumsum_groups[i] : cumsum_groups[i + 1]] + if all_seeds is not None + else None, + error_distribution=error_distribution, + error_params=error_params, + ): i + for i, (ga, X) in enumerate(zip(gas, Xs)) + } + iterable = tqdm( + as_completed(future2pos), + disable=not self.verbose, + total=len(future2pos), + desc="Simulate", + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]", + ) + for future in iterable: + i = future2pos[future] + results[i] = future.result() + + return { + "cols": results[0]["cols"], + "forecasts": np.vstack([r["forecasts"] for r in results]), + } + + def simulate( + self, + h: int, + df: DataFrame, + X_df: Optional[DataFrame] = None, + n_paths: int = 1, + id_col: str = "unique_id", + time_col: str = "ds", + target_col: str = "y", + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ) -> DataFrame: + """Generate sample trajectories (simulated paths). + + This method generates `n_paths` simulated trajectories for each time series + in the input DataFrame. It's useful for scenario planning and risk analysis. + + Args: + h (int): Forecast horizon. + df (DataFrame): Input DataFrame containing time series data. + X_df (DataFrame, optional): Future exogenous variables. + n_paths (int): Number of paths to simulate. + id_col (str): Name of the column containing unique identifiers. + time_col (str): Name of the column containing timestamps. + target_col (str): Name of the column containing target values. + seed (int, optional): Random seed for reproducibility. + error_distribution (str, optional): Error distribution for the simulation. + Options: 'normal', 't', 'bootstrap', 'laplace', 'skew-normal', 'ged'. + error_params (dict, optional): Distribution-specific parameters. + + Returns: + DataFrame with simulated paths, including a `sample_id` column. + """ + self._prepare_fit(df, id_col, time_col, target_col) + self._validate_exog(X_df) + X, _ = self._parse_X_level(h, X_df, None) + + if self.n_jobs == 1: + res_sim = self.ga.simulate( + h=h, + n_paths=n_paths, + models=self.models, + X=X, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + else: + res_sim = self._simulate_parallel( + h=h, + n_paths=n_paths, + X=X, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + + fcsts = res_sim["forecasts"] + cols = res_sim["cols"] + + uids = np.repeat(self.uids, n_paths * h) + dates = np.tile( + self._make_future_df(h)[self.time_col].to_numpy().reshape(-1, h), + (1, n_paths), + ).flatten() + + res_dict = { + self.id_col: uids, + self.time_col: dates, + "sample_id": np.tile(np.repeat(np.arange(n_paths), h), len(self.uids)), + } + for i, col in enumerate(cols): + res_dict[col] = fcsts[:, i] + + return self.df_constructor(res_dict) + def forecast_fitted_values(self): """Retrieve in-sample predictions from the forecast method. @@ -1592,6 +1782,72 @@ def forecast( target_col=target_col, ) + def simulate( + self, + h: int, + df: Any, + X_df: Optional[DataFrame] = None, + n_paths: int = 1, + id_col: str = "unique_id", + time_col: str = "ds", + target_col: str = "y", + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ) -> DataFrame: + """Generate sample trajectories (simulated paths). + + This method generates `n_paths` simulated trajectories for each time series + in the input DataFrame. It's useful for scenario planning and risk analysis. + + Args: + h (int): Forecast horizon, the number of time steps ahead to predict. + df (DataFrame): Input DataFrame containing time series data. Must have + columns for series identifiers, timestamps, and target values. + X_df (DataFrame, optional): DataFrame containing future exogenous variables. + n_paths (int): Number of paths to simulate. + id_col (str, optional): Name of the column containing unique identifiers. + time_col (str, optional): Name of the column containing timestamps. + target_col (str, optional): Name of the column containing target values. + seed (int, optional): Random seed for reproducibility. + error_distribution (str, optional): Error distribution for the simulation. + Options: 'normal', 't', 'bootstrap', 'laplace', 'skew-normal', 'ged'. + error_params (dict, optional): Distribution-specific parameters. + + Returns: + DataFrame with simulated paths, including a `sample_id` column. + """ + if self._is_native(df=df): + return super().simulate( + h=h, + df=df, + X_df=X_df, + n_paths=n_paths, + id_col=id_col, + time_col=time_col, + target_col=target_col, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + assert df is not None + # Distributed simulation not implemented yet, fallback to native if possible or raise + warnings.warn( + "Distributed simulation is not yet implemented. Falling back to native execution." + ) + return super().simulate( + h=h, + df=df, + X_df=X_df, + n_paths=n_paths, + id_col=id_col, + time_col=time_col, + target_col=target_col, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + def forecast_fitted_values(self): if hasattr(self, "_backend"): res = self._backend.forecast_fitted_values() diff --git a/python/statsforecast/ets.py b/python/statsforecast/ets.py index 14b872535..2306adf5b 100644 --- a/python/statsforecast/ets.py +++ b/python/statsforecast/ets.py @@ -1,4 +1,4 @@ -__all__ = ['ets_f'] +__all__ = ["ets_f", "simulate_ets"] import math @@ -39,6 +39,7 @@ def etssimulate( m = 1 # Copy initial state components l = x[0] + b = 0.0 if trend != _ets.Component.Nothing: b = x[1] if season != _ets.Component.Nothing: @@ -720,7 +721,6 @@ def ets_f( upper = np.array([0.9999, 0.9999, 0.9999, _PHI_UPPER]) if any(upper < lower): raise ValueError("Lower limits must be less than upper limits") - # check if y is contant if is_constant(y): return etsmodel( y=y, @@ -1245,3 +1245,99 @@ def forecast_ets(obj, h, level=None): def forward_ets(fitted_model, y): return ets_f(y=y, m=fitted_model["m"], model=fitted_model) + + +def simulate_ets( + model, + h, + n_paths, + seed=None, + error_distribution="normal", + error_params=None, +): + """ + Simulate future paths from a fitted ETS model. + + Parameters + ---------- + model : dict + Fitted ETS model dictionary. + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + # Set up random generator + rng = np.random.default_rng(seed) + if seed is not None: + np.random.seed(seed) # For backward compatibility + + sigma = model["sigma2"] + season_length = model["m"] + last_state = model["states"][-1] + model_type = model["components"] + error = model_type[0] + trend = model_type[1] + seasonality = model_type[2] + + # parameters + alpha = model["par"][0] + beta = model["par"][1] + gamma = model["par"][2] + phi = model["par"][3] + + if math.isnan(beta): + beta = 0 + if math.isnan(gamma): + gamma = 0 + if math.isnan(phi): + phi = 0 + + y_path = np.zeros([n_paths, h]) + + # Get residuals for bootstrap if needed + residuals = model.get("residuals", None) + + for k in range(n_paths): + # Sample errors from specified distribution + e = sample_errors( + size=h, + sigma=np.sqrt(sigma), + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + yhat = np.zeros(h) + etssimulate( + last_state, + season_length, + switch(error), + switch(trend), + switch(seasonality), + alpha, + beta, + gamma, + phi, + h, + yhat, + e, + ) + y_path[k,] = yhat + + return y_path + diff --git a/python/statsforecast/models.py b/python/statsforecast/models.py index d84a01cbe..60b377326 100644 --- a/python/statsforecast/models.py +++ b/python/statsforecast/models.py @@ -1,10 +1,44 @@ -__all__ = ['AutoARIMA', 'AutoETS', 'AutoCES', 'AutoTheta', 'AutoMFLES', 'AutoTBATS', 'ARIMA', 'AutoRegressive', - 'SimpleExponentialSmoothing', 'SimpleExponentialSmoothingOptimized', 'SeasonalExponentialSmoothing', - 'SeasonalExponentialSmoothingOptimized', 'Holt', 'HoltWinters', 'HistoricAverage', 'Naive', - 'RandomWalkWithDrift', 'SeasonalNaive', 'WindowAverage', 'SeasonalWindowAverage', 'ADIDA', 'CrostonClassic', - 'CrostonOptimized', 'CrostonSBA', 'IMAPA', 'TSB', 'MSTL', 'MFLES', 'TBATS', 'Theta', 'OptimizedTheta', - 'DynamicTheta', 'DynamicOptimizedTheta', 'GARCH', 'ARCH', 'SklearnModel', 'ConstantModel', 'ZeroModel', - 'NaNModel'] +__all__ = [ + "AutoARIMA", + "AutoETS", + "AutoCES", + "AutoTheta", + "AutoMFLES", + "AutoTBATS", + "ARIMA", + "AutoRegressive", + "SimpleExponentialSmoothing", + "SimpleExponentialSmoothingOptimized", + "SeasonalExponentialSmoothing", + "SeasonalExponentialSmoothingOptimized", + "Holt", + "HoltWinters", + "HistoricAverage", + "Naive", + "RandomWalkWithDrift", + "SeasonalNaive", + "WindowAverage", + "SeasonalWindowAverage", + "ADIDA", + "CrostonClassic", + "CrostonOptimized", + "CrostonSBA", + "IMAPA", + "TSB", + "MSTL", + "MFLES", + "TBATS", + "Theta", + "OptimizedTheta", + "DynamicTheta", + "DynamicOptimizedTheta", + "GARCH", + "ARCH", + "SklearnModel", + "ConstantModel", + "ZeroModel", + "NaNModel", +] import warnings @@ -169,6 +203,19 @@ def _add_conformal_intervals(self, fcst, y, X, level): def _add_predict_conformal_intervals(self, fcst, level): return self._add_conformal_intervals(fcst=fcst, y=None, X=None, level=level) + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + raise NotImplementedError + class AutoARIMA(_TS): r"""AutoARIMA model. @@ -527,6 +574,100 @@ def forward( res = _add_fitted_pi(res=res, se=se, level=level) return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted or newly estimated AutoARIMA model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors for fitting. + X_future : np.ndarray, optional + Future exogenous regressors of shape (h, n_x). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + if y is not None: + y = _ensure_float(y) + with np.errstate(invalid="ignore"): + mod = auto_arima_f( + x=y, + d=self.d, + D=self.D, + max_p=self.max_p, + max_q=self.max_q, + max_P=self.max_P, + max_Q=self.max_Q, + max_order=self.max_order, + max_d=self.max_d, + max_D=self.max_D, + start_p=self.start_p, + start_q=self.start_q, + start_P=self.start_P, + start_Q=self.start_Q, + stationary=self.stationary, + seasonal=self.seasonal, + ic=self.ic, + stepwise=self.stepwise, + nmodels=self.nmodels, + trace=self.trace, + approximation=self.approximation, + method=self.method, + truncate=self.truncate, + xreg=X, + test=self.test, + test_kwargs=self.test_kwargs, + seasonal_test=self.seasonal_test, + seasonal_test_kwargs=self.seasonal_test_kwargs, + allowdrift=self.allowdrift, + allowmean=self.allowmean, + blambda=self.blambda, + biasadj=self.biasadj, + period=self.season_length, + ) + else: + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + mod = self.model_ + + from statsforecast.arima import simulate_arima + + return simulate_arima( + model=mod, + h=h, + n_paths=n_paths, + xreg=X_future, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + class AutoETS(_TS): r"""Automatic Error, Trend, Seasonal Model. @@ -748,6 +889,70 @@ def forward( res = _add_fitted_pi(res=res, se=se, level=level) return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted or newly estimated AutoETS model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + if y is not None: + y = _ensure_float(y) + mod = ets_f( + y, + m=self.season_length, + model=self.model, + damped=self.damped, + phi=self.phi, + ) + else: + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + mod = self.model_ + + from statsforecast.ets import simulate_ets + + return simulate_ets( + model=mod, + h=h, + n_paths=n_paths, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + class AutoCES(_TS): r"""Complex Exponential Smoothing model. @@ -963,6 +1168,74 @@ def forward( res = _add_fitted_pi(res=res, se=se, level=level) return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted or newly estimated AutoCES model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + if y is not None: + y = _ensure_float(y) + if is_constant(y): + return Naive(alias=self.alias).simulate( + h=h, + n_paths=n_paths, + y=y, + X=X, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + mod = auto_ces(y, m=self.season_length, model=self.model) + else: + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + mod = self.model_ + + from statsforecast.ces import simulate_ces + + return simulate_ces( + model=mod, + h=h, + n_paths=n_paths, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + class AutoTheta(_TS): r"""AutoTheta model. @@ -1061,6 +1334,79 @@ def predict_in_sample(self, level: Optional[List[int]] = None): res = _add_fitted_pi(res=res, se=se, level=level) return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted or newly estimated AutoTheta model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + if y is not None: + y = _ensure_float(y) + if is_constant(y): + return Naive(alias=self.alias).simulate( + h=h, + n_paths=n_paths, + y=y, + X=X, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + mod = auto_theta( + y=y, + m=self.season_length, + model=self.model, + decomposition_type=self.decomposition_type, + ) + else: + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + mod = self.model_ + + from statsforecast.theta import simulate_theta + + return simulate_theta( + model=mod, + h=h, + n_paths=n_paths, + seed=seed, + error_distribution=error_distribution, + error_params=error_params, + ) + def forecast( self, y: np.ndarray, @@ -1870,7 +2216,7 @@ def _optimized_ses_forecast( args=(x,), ).x forecast, fitted = _ses_forecast(x, alpha) - return forecast, fitted + return forecast, fitted, alpha def _chunk_sums(array: np.ndarray, chunk_size: int) -> np.ndarray: @@ -1889,7 +2235,7 @@ def _ses( alpha: float, # smoothing parameter ) -> Dict[str, np.ndarray]: fcst, fitted_vals = _ses_forecast(y, alpha) - out = {"mean": _repeat_val(val=fcst, h=h)} + out = {"mean": _repeat_val(val=fcst, h=h), "alpha": alpha} if fitted: out["fitted"] = fitted_vals return out @@ -1945,7 +2291,11 @@ def fit( """ y = _ensure_float(y) mod = _ses(y=y, alpha=self.alpha, h=1, fitted=True) - self.model_ = dict(mod) + mod = dict(mod) + residuals = y - mod["fitted"] + mod["sigma"] = _calculate_sigma(residuals, len(residuals) - 1) + mod["residuals"] = residuals + self.model_ = mod self._store_cs(y=y, X=X) return self @@ -1973,9 +2323,62 @@ def predict( if self.prediction_intervals is not None: res = self._add_predict_conformal_intervals(res, level) else: - raise Exception("You must pass `prediction_intervals` to compute them.") + # SES Pi: sigma * sqrt(1 + alpha^2 * (h-1)) + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + steps = np.arange(1, h + 1) + sigmah = sigma * np.sqrt(1 + (steps - 1) * alpha**2) + pred_int = _calculate_intervals(res, level, h, sigmah) + res = {**res, **pred_int} return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted SimpleExponentialSmoothing model. + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + last_level = self.model_["mean"][0] + residuals = self.model_.get("residuals", None) + + errors = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + + paths = np.empty((n_paths, h)) + levels = np.empty(n_paths) + levels[:] = last_level + + for i in range(h): + paths[:, i] = levels + errors[:, i] + levels = alpha * paths[:, i] + (1 - alpha) * levels + + return paths + def predict_in_sample(self): r"""Access fitted SimpleExponentialSmoothing insample predictions. @@ -2029,9 +2432,9 @@ def _ses_optimized( h: int, # forecasting horizon fitted: bool, # fitted values ): - fcst_, fitted_vals = _optimized_ses_forecast(y, (0.01, 0.99)) + fcst_, fitted_vals, alpha = _optimized_ses_forecast(y, (0.01, 0.99)) mean = _repeat_val(val=fcst_, h=h) - fcst = {"mean": mean} + fcst = {"mean": mean, "alpha": alpha} if fitted: fcst["fitted"] = fitted_vals return fcst @@ -2083,7 +2486,11 @@ def fit( """ y = _ensure_float(y) mod = _ses_optimized(y=y, h=1, fitted=True) - self.model_ = dict(mod) + mod = dict(mod) + residuals = y - mod["fitted"] + mod["sigma"] = _calculate_sigma(residuals, len(residuals) - 1) + mod["residuals"] = residuals + self.model_ = mod self._store_cs(y=y, X=X) return self @@ -2111,9 +2518,62 @@ def predict( if self.prediction_intervals is not None: res = self._add_predict_conformal_intervals(res, level) else: - raise Exception("You must pass `prediction_intervals` to compute them.") + # SES Pi: sigma * sqrt(1 + alpha^2 * (h-1)) + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + steps = np.arange(1, h + 1) + sigmah = sigma * np.sqrt(1 + (steps - 1) * alpha**2) + pred_int = _calculate_intervals(res, level, h, sigmah) + res = {**res, **pred_int} return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted SimpleExponentialSmoothingOptimized model. + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + last_level = self.model_["mean"][0] + residuals = self.model_.get("residuals", None) + + errors = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + + paths = np.empty((n_paths, h)) + levels = np.empty(n_paths) + levels[:] = last_level + + for i in range(h): + paths[:, i] = levels + errors[:, i] + levels = alpha * paths[:, i] + (1 - alpha) * levels + + return paths + def predict_in_sample(self): r"""Access fitted SimpleExponentialSmoothingOptimized insample predictions. @@ -2174,13 +2634,15 @@ def _seasonal_exponential_smoothing( return {"mean": np.full(h, np.nan, dtype=y.dtype)} season_vals = np.empty(season_length, dtype=y.dtype) fitted_vals = np.full_like(y, np.nan) + alphas = np.empty(season_length, dtype=y.dtype) for i in range(season_length): init_idx = i + n % season_length season_vals[i], fitted_vals[init_idx::season_length] = _ses_forecast( y[init_idx::season_length], alpha ) + alphas[i] = alpha out = _repeat_val_seas(season_vals=season_vals, h=h) - fcst = {"mean": out} + fcst = {"mean": out, "alpha": alphas} if fitted: fcst["fitted"] = fitted_vals return fcst @@ -2248,7 +2710,11 @@ def fit( fitted=True, h=self.season_length, ) - self.model_ = dict(mod) + mod = dict(mod) + residuals = y - mod["fitted"] + mod["sigma"] = _calculate_sigma(residuals, len(y) - self.season_length) + mod["residuals"] = residuals + self.model_ = mod self._store_cs(y=y, X=X) return self @@ -2276,9 +2742,66 @@ def predict( if self.prediction_intervals is not None: res = self._add_predict_conformal_intervals(res, level) else: - raise Exception("You must pass `prediction_intervals` to compute them.") + # Seasonal SES Pi: sigma * sqrt(1 + alpha^2 * (k-1)) where k = floor((h-1)/m) + 1 + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + m = self.season_length + steps = np.arange(1, h + 1) + k = ((steps - 1) // m) + 1 + sigmah = sigma * np.sqrt(1 + (k - 1) * alpha**2) + pred_int = _calculate_intervals(res, level, h, sigmah) + res = {**res, **pred_int} return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted SeasonalExponentialSmoothing model. + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + m = self.season_length + last_cycle = self.model_["mean"] + residuals = self.model_.get("residuals", None) + + errors = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + + paths = np.empty((n_paths, h)) + # levels for each seasonal component + levels = np.tile(last_cycle, (n_paths, 1)) + + for i in range(h): + s_idx = i % m + paths[:, i] = levels[:, s_idx] + errors[:, i] + levels[:, s_idx] = alpha * paths[:, i] + (1 - alpha) * levels[:, s_idx] + + return paths + def predict_in_sample(self): r"""Access fitted SeasonalExponentialSmoothing insample predictions. @@ -2340,13 +2863,15 @@ def _seasonal_ses_optimized( return {"mean": np.full(h, np.nan, dtype=y.dtype)} season_vals = np.empty(season_length, dtype=y.dtype) fitted_vals = np.full_like(y, np.nan) + alphas = np.empty(season_length, dtype=y.dtype) for i in range(season_length): init_idx = i + n % season_length - season_vals[i], fitted_vals[init_idx::season_length] = _optimized_ses_forecast( + season_vals[i], fitted_vals[init_idx::season_length], alpha_ = _optimized_ses_forecast( y[init_idx::season_length], (0.01, 0.99) ) + alphas[i] = alpha_ out = _repeat_val_seas(season_vals=season_vals, h=h) - fcst = {"mean": out} + fcst = {"mean": out, "alpha": alphas} if fitted: fcst["fitted"] = fitted_vals return fcst @@ -2411,7 +2936,11 @@ def fit( fitted=True, h=self.season_length, ) - self.model_ = dict(mod) + mod = dict(mod) + residuals = y - mod["fitted"] + mod["sigma"] = _calculate_sigma(residuals, len(y) - self.season_length) + mod["residuals"] = residuals + self.model_ = mod self._store_cs(y=y, X=X) return self @@ -2439,9 +2968,66 @@ def predict( if self.prediction_intervals is not None: res = self._add_predict_conformal_intervals(res, level) else: - raise Exception("You must pass `prediction_intervals` to compute them.") + # Seasonal SES Pi: sigma * sqrt(1 + alpha^2 * (k-1)) where k = floor((h-1)/m) + 1 + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + m = self.season_length + steps = np.arange(1, h + 1) + k = ((steps - 1) // m) + 1 + sigmah = sigma * np.sqrt(1 + (k - 1) * alpha**2) + pred_int = _calculate_intervals(res, level, h, sigmah) + res = {**res, **pred_int} return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted SeasonalExponentialSmoothingOptimized model. + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + alpha = self.model_["alpha"] + sigma = self.model_["sigma"] + m = self.season_length + last_cycle = self.model_["mean"] + residuals = self.model_.get("residuals", None) + + errors = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + + paths = np.empty((n_paths, h)) + # levels for each seasonal component + levels = np.tile(last_cycle, (n_paths, 1)) + + for i in range(h): + s_idx = i % m + paths[:, i] = levels[:, s_idx] + errors[:, i] + levels[:, s_idx] = alpha * paths[:, i] + (1 - alpha) * levels[:, s_idx] + + return paths + def predict_in_sample(self): r"""Access fitted SeasonalExponentialSmoothingOptimized insample predictions. @@ -2621,6 +3207,7 @@ def fit( mod = dict(mod) residuals = y - mod["fitted"] mod["sigma"] = _calculate_sigma(residuals, len(residuals) - 1) + mod["residuals"] = residuals mod["n"] = len(y) self.model_ = mod self._store_cs(y=y, X=X) @@ -2658,6 +3245,72 @@ def predict( return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted HistoricAverage model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + sigma = self.model_["sigma"] + n = self.model_["n"] + mean = self.model_["mean"][0] + residuals = self.model_.get("residuals", None) + + # For HistoricAverage, the prediction variance is sigma^2 * (1 + 1/n) + adj_sigma = sigma * np.sqrt(1 + 1 / n) + + errors = sample_errors( + size=(n_paths, h), + sigma=adj_sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + return mean + errors + def predict_in_sample(self, level: Optional[List[int]] = None): r"""Access fitted HistoricAverage insample predictions. @@ -2769,10 +3422,77 @@ def fit( residuals = y - mod["fitted"] sigma = _calculate_sigma(residuals, len(residuals) - 1) mod["sigma"] = sigma + mod["residuals"] = residuals self.model_ = mod self._store_cs(y=y, X=X) return self + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted Naive model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + if seed is not None: + np.random.seed(seed) + + # Naive: y_t = y_{t-1} + e_t + # Simulation: y_{T+h} = y_T + sum_{i=1}^h e_{T+i} + sigma = self.model_["sigma"] + last_value = self.model_["mean"][0] + residuals = self.model_.get("residuals", None) + + drifts = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + return last_value + np.cumsum(drifts, axis=1) + def predict( self, h: int, # forecasting horizon @@ -2966,6 +3686,7 @@ def fit( residuals = y - mod["fitted"] sigma = _calculate_sigma(residuals, len(residuals) - 1) mod["sigma"] = sigma + mod["residuals"] = residuals mod["n"] = len(y) self.model_ = mod self._store_cs(y=y, X=X) @@ -3001,6 +3722,77 @@ def predict( res = {**res, **pred_int} return res + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted RandomWalkWithDrift model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + + sigma = self.model_["sigma"] + n = self.model_["n"] + last_y = self.model_["last_y"] + slope = self.model_["slope"] + residuals = self.model_.get("residuals", None) + + # Simulation: y_{T+h} = y_T + h*drift + sum_{i=1}^h e_i + # Where Var(e_i) = sigma^2 * (1 + 1/(n-1)) + adj_sigma = sigma * np.sqrt(1 + 1 / (n - 1)) + + errors = sample_errors( + size=(n_paths, h), + sigma=adj_sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + # Cumulative sum of errors + trend + hrange = np.arange(1, h + 1) + trend = last_y + slope * hrange + return trend + np.cumsum(errors, axis=1) + def predict_in_sample(self, level: Optional[List[int]] = None): r"""Access fitted RandomWalkWithDrift insample predictions. @@ -3117,10 +3909,83 @@ def fit( mod = dict(mod) residuals = y - mod["fitted"] mod["sigma"] = _calculate_sigma(residuals, len(y) - self.season_length) + mod["residuals"] = residuals self.model_ = mod self._store_cs(y=y, X=X) return self + def simulate( + self, + h: int, + n_paths: int, + y: Optional[np.ndarray] = None, + X: Optional[np.ndarray] = None, + X_future: Optional[np.ndarray] = None, + seed: Optional[int] = None, + error_distribution: str = "normal", + error_params: Optional[Dict] = None, + ): + """ + Simulate future paths from a fitted SeasonalNaive model. + + Parameters + ---------- + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + y : np.ndarray, optional + Time series data. If provided, fits a new model; otherwise uses fitted model. + X : np.ndarray, optional + Exogenous regressors (unused, for API consistency). + X_future : np.ndarray, optional + Future exogenous regressors (unused, for API consistency). + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + from statsforecast.simulation import sample_errors + + if y is not None: + self.fit(y=y, X=X) + if not hasattr(self, "model_"): + raise Exception("You have to use the `fit` method first") + + rng = np.random.default_rng(seed) + if seed is not None: + np.random.seed(seed) + + m = self.season_length + sigma = self.model_["sigma"] + last_cycle = self.model_["mean"] + residuals = self.model_.get("residuals", None) + + errors = sample_errors( + size=(n_paths, h), + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + paths = np.zeros((n_paths, h)) + for i in range(h): + if i < m: + prev_vals = last_cycle[i] + else: + prev_vals = paths[:, i - m] + paths[:, i] = prev_vals + errors[:, i] + return paths + def predict( self, h: int, @@ -4678,7 +5543,6 @@ def __init__( if repr(trend_forecaster) == "AutoETS": if trend_forecaster.model[2] != "N": raise Exception("Trend forecaster should not adjust seasonal models.") - # check if trend forecaster has season_length=1 if hasattr(trend_forecaster, "season_length"): if trend_forecaster.season_length != 1: raise Exception( @@ -5292,7 +6156,7 @@ class GARCH(_TS): ``` math y_t = v_t \sigma_t ``` - + with diff --git a/python/statsforecast/mstl.py b/python/statsforecast/mstl.py index 714e25656..f426155b0 100644 --- a/python/statsforecast/mstl.py +++ b/python/statsforecast/mstl.py @@ -32,11 +32,11 @@ def mstl( raise Exception( "`mstl` cannot handle missing values. " "Please raise an issue to include this feature." - ) # we should interpolate here + ) if blambda is not None: raise Exception( "`blambda` not implemented yet. " - "Please rise an issue to include this feature." + "Please raise an issue to include this feature." ) stl_kwargs = {"seasonal_deg": 0, **stl_kwargs} if msts[0] > 1: diff --git a/python/statsforecast/simulation.py b/python/statsforecast/simulation.py new file mode 100644 index 000000000..3b8b4b869 --- /dev/null +++ b/python/statsforecast/simulation.py @@ -0,0 +1,214 @@ +""" +Simulation utilities for StatsForecast. + +This module provides error distribution sampling functions for Monte Carlo +simulation of forecast paths. Supports multiple error distributions beyond +the standard normal distribution. +""" + +__all__ = [ + "sample_errors", + "SUPPORTED_DISTRIBUTIONS", +] + +from typing import Dict, Optional, Tuple, Union + +import numpy as np +from scipy import stats + + +SUPPORTED_DISTRIBUTIONS = frozenset([ + "normal", + "t", + "bootstrap", + "laplace", + "skew-normal", + "ged", +]) + + +def _validate_distribution(distribution: str) -> None: + """Validate that the distribution is supported.""" + if distribution not in SUPPORTED_DISTRIBUTIONS: + raise ValueError( + f"Unsupported error distribution: '{distribution}'. " + f"Supported distributions are: {sorted(SUPPORTED_DISTRIBUTIONS)}" + ) + + +def sample_errors( + size: Union[int, Tuple[int, ...]], + sigma: float, + distribution: str = "normal", + params: Optional[Dict] = None, + residuals: Optional[np.ndarray] = None, + rng: Optional[np.random.Generator] = None, +) -> np.ndarray: + """ + Sample errors from a specified distribution for simulation. + + Parameters + ---------- + size : int or tuple of ints + Output shape. If (n_paths, h), generates a matrix of errors. + sigma : float + Scale parameter (standard deviation for normal-like distributions). + distribution : str, default='normal' + The error distribution to use. Options: + - 'normal': Standard normal distribution (default) + - 't': Student's t-distribution (requires 'df' in params) + - 'bootstrap': Resample from empirical residuals (requires residuals) + - 'laplace': Laplace (double exponential) distribution + - 'skew-normal': Skewed normal distribution (optional 'skewness' in params) + - 'ged': Generalized Error Distribution (optional 'shape' in params) + params : dict, optional + Distribution-specific parameters: + - For 't': {'df': degrees_of_freedom} (default: 5) + - For 'skew-normal': {'skewness': alpha} (default: 0) + - For 'ged': {'shape': beta} (default: 2, which equals normal) + residuals : np.ndarray, optional + Fitted residuals for bootstrap resampling. Required when distribution='bootstrap'. + rng : np.random.Generator, optional + Random number generator for reproducibility. If None, uses numpy's default. + + Returns + ------- + np.ndarray + Array of sampled errors with the specified shape. + + Examples + -------- + >>> # Normal errors + >>> errors = sample_errors(size=(100, 10), sigma=1.0) + + >>> # Student's t with 5 degrees of freedom + >>> errors = sample_errors(size=(100, 10), sigma=1.0, distribution='t', params={'df': 5}) + + >>> # Bootstrap from residuals + >>> errors = sample_errors(size=(100, 10), sigma=1.0, distribution='bootstrap', residuals=resid) + """ + _validate_distribution(distribution) + + if params is None: + params = {} + + if rng is None: + rng = np.random.default_rng() + + if distribution == "normal": + return rng.normal(0, sigma, size) + + elif distribution == "t": + df = params.get("df", 5) + if df <= 2: + raise ValueError("Degrees of freedom (df) must be > 2 for finite variance.") + # Scale t-distribution to have the desired standard deviation + # Var(t) = df / (df - 2) for df > 2 + t_scale = sigma * np.sqrt((df - 2) / df) + return stats.t.rvs(df, scale=t_scale, size=size, random_state=rng) + + elif distribution == "bootstrap": + if residuals is None: + raise ValueError( + "Bootstrap distribution requires 'residuals' parameter. " + "Pass the fitted model residuals." + ) + # Remove NaN values from residuals + clean_residuals = residuals[~np.isnan(residuals)] + if len(clean_residuals) == 0: + raise ValueError("No valid residuals available for bootstrap sampling.") + # Resample from empirical residuals + if isinstance(size, int): + total_size = size + else: + total_size = np.prod(size) + sampled = rng.choice(clean_residuals, size=total_size, replace=True) + return sampled.reshape(size) + + elif distribution == "laplace": + # Laplace scale parameter: sigma = scale * sqrt(2) + laplace_scale = sigma / np.sqrt(2) + return rng.laplace(0, laplace_scale, size) + + elif distribution == "skew-normal": + skewness = params.get("skewness", 0) + # Sample from skew-normal and scale to desired sigma + # scipy's skewnorm uses 'a' as shape parameter (skewness) + raw_samples = stats.skewnorm.rvs(skewness, size=size, random_state=rng) + # Standardize and rescale + if np.std(raw_samples) > 0: + return raw_samples * sigma / np.std(raw_samples) + return raw_samples * sigma + + elif distribution == "ged": + # Generalized Error Distribution (generalized normal) + # shape=2 -> normal, shape=1 -> Laplace, shape->inf -> uniform + shape = params.get("shape", 2) + if shape <= 0: + raise ValueError("GED shape parameter must be positive.") + # scipy's gennorm uses beta as the shape parameter + raw_samples = stats.gennorm.rvs(shape, size=size, random_state=rng) + # Scale to desired sigma + ged_var = stats.gennorm.var(shape) + if ged_var > 0: + return raw_samples * sigma / np.sqrt(ged_var) + return raw_samples * sigma + + else: + # This should not be reached due to validation, but just in case + raise ValueError(f"Unknown distribution: {distribution}") + + +def get_distribution_info(distribution: str) -> Dict: + """ + Get information about a supported distribution. + + Parameters + ---------- + distribution : str + Name of the distribution. + + Returns + ------- + dict + Dictionary with keys: + - 'name': Full name of the distribution + - 'params': List of supported parameters + - 'description': Brief description + """ + _validate_distribution(distribution) + + info = { + "normal": { + "name": "Normal (Gaussian)", + "params": [], + "description": "Standard normal distribution. Default choice for most applications.", + }, + "t": { + "name": "Student's t", + "params": ["df (degrees of freedom, default=5)"], + "description": "Heavy-tailed distribution ideal for financial time series.", + }, + "bootstrap": { + "name": "Bootstrap (Empirical)", + "params": [], + "description": "Non-parametric resampling from fitted residuals.", + }, + "laplace": { + "name": "Laplace (Double Exponential)", + "params": [], + "description": "Sharper peak with heavier tails than normal. Good for intermittent demand.", + }, + "skew-normal": { + "name": "Skewed Normal", + "params": ["skewness (alpha, default=0)"], + "description": "Asymmetric normal distribution. Use for skewed forecast errors.", + }, + "ged": { + "name": "Generalized Error Distribution", + "params": ["shape (beta, default=2)"], + "description": "Flexible distribution: shape=2 is normal, shape=1 is Laplace.", + }, + } + + return info[distribution] diff --git a/python/statsforecast/theta.py b/python/statsforecast/theta.py index 037bd1845..3527af9aa 100644 --- a/python/statsforecast/theta.py +++ b/python/statsforecast/theta.py @@ -1,4 +1,4 @@ -__all__ = ['forecast_theta', 'auto_theta', 'forward_theta'] +__all__ = ["forecast_theta", "auto_theta", "forward_theta", "simulate_theta"] import math @@ -195,17 +195,54 @@ def thetamodel( def compute_pi_samples( - n, h, states, sigma, alpha, theta, mean_y, seed=0, n_samples=200 + n, + h, + states, + sigma, + alpha, + theta, + mean_y, + seed=0, + n_samples=200, + error_distribution="normal", + error_params=None, + residuals=None, ): + """ + Compute prediction interval samples for Theta model. + + Parameters + ---------- + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. + residuals : np.ndarray, optional + Residuals for bootstrap sampling. + """ + from statsforecast.simulation import sample_errors + samples = np.full((h, n_samples), fill_value=np.nan, dtype=np.float32) # states: level, meany, An, Bn, mu smoothed, _, A, B, _ = states[-1] - np.random.seed(seed) + + rng = np.random.default_rng(seed) + for i in range(n, n + h): samples[i - n] = smoothed + (1 - 1 / theta) * ( A * ((1 - alpha) ** i) + B * (1 - (1 - alpha) ** (i + 1)) / alpha ) - samples[i - n] += np.random.normal(scale=sigma, size=n_samples) + # Sample errors from specified distribution + errors = sample_errors( + size=n_samples, + sigma=sigma, + distribution=error_distribution, + params=error_params, + residuals=residuals, + rng=rng, + ) + samples[i - n] += errors smoothed = alpha * samples[i - n] + (1 - alpha) * smoothed mean_y = (i * mean_y + samples[i - n]) / (i + 1) B = ((i - 1) * B + 6 * (samples[i - n] - mean_y) / (i + 1)) / (i + 2) @@ -213,6 +250,72 @@ def compute_pi_samples( return samples +def simulate_theta( + model, + h, + n_paths, + seed=None, + error_distribution="normal", + error_params=None, +): + """ + Simulate future paths from a fitted Theta model. + + Parameters + ---------- + model : dict + Fitted Theta model dictionary. + h : int + Forecast horizon. + n_paths : int + Number of simulation paths to generate. + seed : int, optional + Random seed for reproducibility. + error_distribution : str, default='normal' + Distribution for error terms. Options: 'normal', 't', 'bootstrap', + 'laplace', 'skew-normal', 'ged'. + error_params : dict, optional + Distribution-specific parameters. E.g., {'df': 5} for t-distribution. + + Returns + ------- + np.ndarray + Simulated paths of shape (n_paths, h). + """ + # Set up random generator + if seed is not None: + np.random.seed(seed) + + sigma = np.std(model["residuals"][3:], ddof=1) + residuals = model["residuals"][3:] # For bootstrap + + samples = compute_pi_samples( + n=model["n"], + h=h, + states=model["states"], + sigma=sigma, + alpha=model["par"]["alpha"], + theta=model["par"]["theta"], + mean_y=model["mean_y"], + seed=seed if seed is not None else 0, + n_samples=n_paths, + error_distribution=error_distribution, + error_params=error_params, + residuals=residuals, + ) + + res = samples.T + + if model.get("decompose", False): + seas_forecast = _repeat_val_seas(model["seas_forecast"]["mean"], h=h) + if model["decomposition_type"] == "multiplicative": + res = res * seas_forecast + else: + res = res + seas_forecast + return res + + + def forecast_theta(obj, h, level=None): forecast = np.empty(h) n = obj["n"] diff --git a/tests/test_simulation.py b/tests/test_simulation.py new file mode 100644 index 000000000..20e1ad23d --- /dev/null +++ b/tests/test_simulation.py @@ -0,0 +1,206 @@ +import numpy as np +import pandas as pd +import pytest +from statsforecast import StatsForecast +from statsforecast.models import AutoETS, AutoARIMA, AutoCES, AutoTheta + + +def test_simulate_basic(): + df = pd.DataFrame( + { + "unique_id": [1] * 20 + [2] * 20, + "ds": pd.to_datetime(["2020-01-01"] * 40) + + pd.to_timedelta(np.tile(np.arange(20), 2), unit="D"), + "y": np.concatenate( + [ + np.arange(20) + np.random.normal(0, 0.1, 20), + np.arange(20, 0, -1) + np.random.normal(0, 0.1, 20), + ] + ), + } + ) + + models = [ + AutoETS(season_length=1), + AutoARIMA(season_length=1), + AutoCES(season_length=1), + AutoTheta(season_length=1), + ] + sf = StatsForecast(models=models, freq="D") + + h = 5 + n_paths = 10 + seed = 42 + + sim_df = sf.simulate(h=h, df=df, n_paths=n_paths, seed=seed) + + # Check shape: 2 groups * 10 paths * 5 horizon = 100 + assert len(sim_df) == 100 + + # Check columns + expected_cols = { + "unique_id", + "ds", + "sample_id", + "AutoETS", + "AutoARIMA", + "CES", + "AutoTheta", + } + assert expected_cols.issubset(set(sim_df.columns)) + + # Check sample_id range + assert sim_df["sample_id"].nunique() == n_paths + assert sim_df["sample_id"].min() == 0 + assert sim_df["sample_id"].max() == n_paths - 1 + + # Check unique_id + assert set(sim_df["unique_id"]) == {1, 2} + + # Check reproducibility + sim_df2 = sf.simulate(h=h, df=df, n_paths=n_paths, seed=seed) + pd.testing.assert_frame_equal(sim_df, sim_df2) + + # Check different seed + sim_df3 = sf.simulate(h=h, df=df, n_paths=n_paths, seed=seed + 1) + with pytest.raises(AssertionError): + pd.testing.assert_frame_equal(sim_df, sim_df3) + + +def test_simulate_with_exog(): + # Create data with exogenous + n_history = 100 + exog1 = np.random.randn(n_history) + df = pd.DataFrame( + { + "unique_id": [1] * n_history, + "ds": pd.to_datetime(["2020-01-01"] * n_history) + + pd.to_timedelta(np.arange(n_history), unit="D"), + "y": np.arange(n_history) + 10 * exog1 + 0.1 * np.random.randn(n_history), + "exog1": exog1, + } + ) + + h = 5 + X_df = pd.DataFrame( + { + "unique_id": [1] * h, + "ds": pd.to_datetime(["2020-01-01"] * h) + + pd.to_timedelta(np.arange(n_history, n_history + h), unit="D"), + "exog1": np.random.randn(h), + } + ) + + models = [AutoARIMA(season_length=1)] + sf = StatsForecast(models=models, freq="D") + + n_paths = 3 + sim_df = sf.simulate(h=h, df=df, X_df=X_df, n_paths=n_paths, seed=42) + + assert len(sim_df) == h * n_paths + assert "AutoARIMA" in sim_df.columns + + +def test_simulate_parallel(): + df = pd.DataFrame( + { + "unique_id": np.repeat(np.arange(10), 20), + "ds": pd.to_datetime(["2020-01-01"] * 200) + + pd.to_timedelta(np.tile(np.arange(20), 10), unit="D"), + "y": np.random.randn(200), + } + ) + + models = [AutoETS(season_length=1)] + sf = StatsForecast(models=models, freq="D", n_jobs=2) + + h = 3 + n_paths = 2 + sim_df = sf.simulate(h=h, df=df, n_paths=n_paths, seed=42) + + # Check reproducibility with parallel + sim_df2 = sf.simulate(h=h, df=df, n_paths=n_paths, seed=42) + assert len(sim_df2) == 60 + pd.testing.assert_frame_equal(sim_df, sim_df2) + + +def test_simulate_edge_cases(): + # Need at least 15 observations for AutoETS to avoid 'tiny datasets' error + df = pd.DataFrame( + { + "unique_id": [1] * 15, + "ds": pd.to_datetime(["2020-01-01"] * 15) + + pd.to_timedelta(np.arange(15), unit="D"), + "y": np.arange(15), + } + ) + + models = [AutoETS(season_length=1)] + sf = StatsForecast(models=models, freq="D") + + # h=1, n_paths=1 + sim_df = sf.simulate(h=1, df=df, n_paths=1, seed=42) + assert len(sim_df) == 1 + assert sim_df["sample_id"].iloc[0] == 0 + + +def test_simulate_constant_series(): + df = pd.DataFrame( + { + "unique_id": [1] * 10, + "ds": pd.to_datetime(["2020-01-01"] * 10) + + pd.to_timedelta(np.arange(10), unit="D"), + "y": [10.0] * 10, + } + ) + + models = [AutoARIMA(season_length=1), AutoCES(season_length=1)] + sf = StatsForecast(models=models, freq="D") + + sim_df = sf.simulate(h=3, df=df, n_paths=2, seed=42) + assert len(sim_df) == 6 + np.testing.assert_allclose(sim_df["AutoARIMA"], 10.0) + np.testing.assert_allclose(sim_df["CES"], 10.0) + + +def test_simulate_custom_aliases(): + df = pd.DataFrame( + { + "unique_id": [1] * 15, + "ds": pd.to_datetime(["2020-01-01"] * 15) + + pd.to_timedelta(np.arange(15), unit="D"), + "y": np.arange(15), + } + ) + + models = [AutoETS(season_length=1, alias="MyETS")] + sf = StatsForecast(models=models, freq="D") + + sim_df = sf.simulate(h=2, df=df, n_paths=1) + assert "MyETS" in sim_df.columns + assert "AutoETS" not in sim_df.columns + + +def test_simulate_multiple_configs(): + df = pd.DataFrame( + { + "unique_id": [1] * 20, + "ds": pd.to_datetime(["2020-01-01"] * 20) + + pd.to_timedelta(np.arange(20), unit="D"), + "y": np.arange(20) + np.random.normal(0, 0.1, 20), + } + ) + + models = [ + AutoETS(season_length=1, alias="ETS_1"), + AutoETS(season_length=1, alias="ETS_2"), + ] + sf = StatsForecast(models=models, freq="D") + + sim_df = sf.simulate(h=5, df=df, n_paths=2, seed=42) + assert "ETS_1" in sim_df.columns + assert "ETS_2" in sim_df.columns + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/test_simulation_distributions.py b/tests/test_simulation_distributions.py new file mode 100644 index 000000000..b8b991b22 --- /dev/null +++ b/tests/test_simulation_distributions.py @@ -0,0 +1,103 @@ +import numpy as np +import pandas as pd +import pytest +from statsforecast import StatsForecast +from statsforecast.models import ( + AutoARIMA, AutoETS, AutoCES, AutoTheta, Naive, + HistoricAverage, RandomWalkWithDrift, SimpleExponentialSmoothing +) + +def test_simulation_distributions_basic(): + df = pd.DataFrame({ + 'unique_id': [1, 1, 1, 1, 1], + 'ds': pd.date_range('2000-01-01', periods=5), + 'y': [1, 2, 3, 4, 10] + }) + + models = [Naive()] + sf = StatsForecast(models=models, freq='D') + + # Test normal (default) + res_normal = sf.simulate(h=3, df=df, n_paths=10, seed=42) + assert 'Naive' in res_normal.columns + assert res_normal.shape == (30, 4) # 3 steps * 10 paths = 30 rows + + # Test Student's t + res_t = sf.simulate(h=3, df=df, n_paths=10, seed=42, + error_distribution='t', error_params={'df': 3}) + assert 'Naive' in res_t.columns + + # Test Laplace + res_laplace = sf.simulate(h=3, df=df, n_paths=10, seed=42, + error_distribution='laplace') + assert 'Naive' in res_laplace.columns + + # Ensure different distributions produce different results with same seed (if sigma is same) + # Actually, they might not be wildly different for h=1, but let's check values + assert not np.array_equal(res_normal['Naive'].values, res_t['Naive'].values) + assert not np.array_equal(res_normal['Naive'].values, res_laplace['Naive'].values) + +def test_simulation_bootstrap(): + df = pd.DataFrame({ + 'unique_id': [1] * 100, + 'ds': pd.date_range('2000-01-01', periods=100), + 'y': np.random.normal(0, 1, 100).cumsum() + }) + + # Use ARIMA for bootstrap as it stores residuals + models = [AutoARIMA()] + sf = StatsForecast(models=models, freq='D') + + res_bootstrap = sf.simulate(h=5, df=df, n_paths=20, seed=42, + error_distribution='bootstrap') + assert 'AutoARIMA' in res_bootstrap.columns + assert res_bootstrap.shape == (100, 4) # 5 steps * 20 paths = 100 rows + +def test_simulation_all_models_distributions(): + df = pd.DataFrame({ + 'unique_id': [1] * 20, + 'ds': pd.date_range('2000-01-01', periods=20), + 'y': np.random.normal(0, 1, 20).cumsum() + }) + + models = [ + AutoARIMA(), + AutoETS(season_length=4), + AutoCES(season_length=4), + AutoTheta(season_length=4), + Naive(), + HistoricAverage(), + RandomWalkWithDrift(), + SimpleExponentialSmoothing(alpha=0.5) + ] + sf = StatsForecast(models=models, freq='D') + + # Test common distribution for all + res = sf.simulate(h=2, df=df, n_paths=5, seed=42, + error_distribution='t', error_params={'df': 5}) + + for model in models: + assert repr(model) in res.columns + +def test_invalid_distribution(): + df = pd.DataFrame({ + 'unique_id': [1, 2], + 'ds': [1, 1], + 'y': [1, 2] + }) + sf = StatsForecast(models=[Naive()], freq=1) + with pytest.raises(ValueError, match="Unsupported error distribution"): + sf.simulate(h=1, df=df, error_distribution='invalid_dist') + +def test_reproducibility_with_distributions(): + df = pd.DataFrame({ + 'unique_id': [1] * 10, + 'ds': range(10), + 'y': np.random.randn(10) + }) + sf = StatsForecast(models=[Naive()], freq=1) + + res1 = sf.simulate(h=5, df=df, n_paths=10, seed=42, error_distribution='t', error_params={'df': 3}) + res2 = sf.simulate(h=5, df=df, n_paths=10, seed=42, error_distribution='t', error_params={'df': 3}) + + np.testing.assert_array_almost_equal(res1['Naive'].values, res2['Naive'].values)