Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 141 additions & 5 deletions python/statsforecast/arima.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
__all__ = ['predict_arima', 'arima_string', 'forecast_arima', 'fitted_arima', 'auto_arima_f', 'print_statsforecast_ARIMA',
'ARIMASummary', 'AutoARIMA']
__all__ = [
"predict_arima",
"arima_string",
"forecast_arima",
"fitted_arima",
"auto_arima_f",
"print_statsforecast_ARIMA",
"ARIMASummary",
"AutoARIMA",
"simulate_arima",
]


import math
Expand Down Expand Up @@ -50,7 +59,6 @@ def getQ0(phi, theta):


def arima_transpar(params_in, arma, trans):
# TODO check trans=True results
return _arima.arima_transpar(params_in, arma, trans)


Expand Down Expand Up @@ -668,8 +676,6 @@ def predict_arima(model, n_ahead, newxreg=None, se_fit=True):

narma = sum(arma[:4])
if len(coefs) > narma:
# check intercept
# i think xreg is unused
if ncoefs[narma] == "intercept":
intercept = np.ones(n_ahead, dtype=np.float64).reshape(-1, 1)
if newxreg is None:
Expand Down Expand Up @@ -1225,6 +1231,136 @@ def forecast_arima(
return ans


def simulate_arima(
model,
h,
n_paths,
xreg=None,
seed=None,
error_distribution="normal",
error_params=None,
):
"""
Simulate future paths from a fitted ARIMA model.

Parameters
----------
model : dict
Fitted ARIMA model dictionary.
h : int
Forecast horizon.
n_paths : int
Number of simulation paths to generate.
xreg : np.ndarray, optional
Future exogenous regressors of shape (h, n_x).
seed : int, optional
Random seed for reproducibility.
error_distribution : str, default='normal'
Distribution for error terms. Options: 'normal', 't', 'bootstrap',
'laplace', 'skew-normal', 'ged'.
error_params : dict, optional
Distribution-specific parameters. E.g., {'df': 5} for t-distribution.

Returns
-------
np.ndarray
Simulated paths of shape (n_paths, h).
"""
from statsforecast.simulation import sample_errors

if model.get("constant", False):
return np.full((n_paths, h), model["x"][-1])

# Set up random generator
rng = np.random.default_rng(seed)
if seed is not None:
np.random.seed(seed) # For backward compatibility

# Regression component (similar to forecast_arima and predict_arima)
use_drift = "drift" in model["coef"].keys()
if use_drift:
n = len(model["x"])
drift = np.arange(1, h + 1, dtype=np.float64).reshape(-1, 1)
drift += n
if xreg is not None:
xreg = np.concatenate([drift, xreg], axis=1)
else:
xreg = drift
arma = model["arma"]
ncoefs = list(model["coef"].keys())
coefs = np.array(list(model["coef"].values()))
narma = sum(arma[:4])

newxreg = xreg
if len(coefs) > narma:
if ncoefs[narma] == "intercept":
intercept = np.ones(h, dtype=np.float64).reshape(-1, 1)
if newxreg is None:
newxreg = intercept
else:
newxreg = np.concatenate([intercept, newxreg], axis=1)

if newxreg is not None:
if narma == 0:
needed_cols = len(coefs)
reg_coefs = coefs
else:
needed_cols = len(coefs) - narma
reg_coefs = coefs[narma:]

if newxreg.shape[1] != needed_cols:
if newxreg.shape[1] > needed_cols:
xm = np.matmul(newxreg[:, :needed_cols], reg_coefs)
else:
raise Exception(
f"Number of regressors ({newxreg.shape[1]}) does not match fitted model ({needed_cols})"
)
else:
xm = np.matmul(newxreg, reg_coefs)
xm = xm.flatten()
else:
xm = 0
else:
xm = 0

# Kalman filter state-space components
Z, a_init, T, V, obs_h = [model["model"][var] for var in ["Z", "a", "T", "V", "h"]]
sigma2 = model["sigma2"]

paths = np.empty((n_paths, h))

# V can be singular, so we use SVD-based sampling for state transition noise
u, s, _ = np.linalg.svd(V * sigma2)
state_noise_std = u * np.sqrt(s)

obs_noise_std = np.sqrt(obs_h * sigma2)
residuals = model.get("residuals", None)

for i in range(n_paths):
a = a_init.copy()
for t in range(h):
# State transition noise (Gaussian for state-space consistency)
state_noise = rng.normal(size=a.size)
a = T @ a + state_noise_std @ state_noise

# Observation noise with selected distribution
obs_noise = sample_errors(
size=1,
sigma=obs_noise_std,
distribution=error_distribution,
params=error_params,
residuals=residuals,
rng=rng,
)[0]
paths[i, t] = (Z @ a) + obs_noise

if not isinstance(xm, int):
paths += xm

return paths



def fitted_arima(model, h=1):
"""Returns h-step forecasts for the data used in fitting the model.

Expand Down
76 changes: 75 additions & 1 deletion python/statsforecast/ces.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__all__ = ['ces_target_fn']
__all__ = ["auto_ces", "simulate_ces"]


import math
Expand Down Expand Up @@ -802,3 +802,77 @@ def forward_ces(fitted_model, y):
beta_0=beta_0,
beta_1=beta_1,
)


def simulate_ces(
model,
h,
n_paths,
seed=None,
error_distribution="normal",
error_params=None,
):
"""
Simulate future paths from a fitted CES model.

Parameters
----------
model : dict
Fitted CES model dictionary.
h : int
Forecast horizon.
n_paths : int
Number of simulation paths to generate.
seed : int, optional
Random seed for reproducibility.
error_distribution : str, default='normal'
Distribution for error terms. Options: 'normal', 't', 'bootstrap',
'laplace', 'skew-normal', 'ged'.
error_params : dict, optional
Distribution-specific parameters. E.g., {'df': 5} for t-distribution.

Returns
-------
np.ndarray
Simulated paths of shape (n_paths, h).
"""
from statsforecast.simulation import sample_errors

# Set up random generator
rng = np.random.default_rng(seed)
if seed is not None:
np.random.seed(seed)

y_path = np.zeros([n_paths, h])

sigma = np.sqrt(model["sigma2"])
states_shape = model["states"].shape

# Get residuals for bootstrap if needed
residuals = model.get("residuals", None)

for k in range(n_paths):
# Sample state noise from specified distribution
e = sample_errors(
size=states_shape,
sigma=sigma,
distribution=error_distribution,
params=error_params,
residuals=residuals,
rng=rng,
)
states = model["states"]
fcsts = np.zeros(h, dtype=np.float32)
cesforecast(
states=states + e,
n=model["n"],
m=model["m"],
season=switch_ces(model["seasontype"]),
h=h,
f=fcsts,
**model["par"],
)
y_path[k,] = fcsts

return y_path

Loading