diff --git a/ads/model/model_metadata.py b/ads/model/model_metadata.py index f0428ec9c..995aba0bd 100644 --- a/ads/model/model_metadata.py +++ b/ads/model/model_metadata.py @@ -165,6 +165,7 @@ class Framework(ExtendedEnum): PYOD = "pyod" SPACY = "spacy" PROPHET = "prophet" + THETA = "theta" SKTIME = "sktime" STATSMODELS = "statsmodels" CUML = "cuml" diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index f2265418a..a6e10e3e0 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -16,6 +16,7 @@ class SupportedModels(ExtendedEnum): LGBForecast = "lgbforecast" AutoMLX = "automlx" AutoTS = "autots" + Theta = "theta" # Auto = "auto" diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index 262fe5bbc..1aa4f99b6 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -4,6 +4,7 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ from ads.opctl.operator.lowcode.common.transformations import Transformations +from .theta import ThetaOperatorModel from ..const import ( AUTO_SELECT, @@ -46,6 +47,7 @@ class ForecastOperatorModelFactory: SupportedModels.LGBForecast: MLForecastOperatorModel, SupportedModels.AutoMLX: AutoMLXOperatorModel, SupportedModels.AutoTS: AutoTSOperatorModel, + SupportedModels.Theta: ThetaOperatorModel, } @classmethod diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 3019b6839..0c234b906 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -345,19 +345,18 @@ def populate_series_output( f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) from e + start_idx = output_i.shape[0] - self.horizon - len(fit_val) if (output_i.shape[0] - self.horizon) == len(fit_val): - output_i["fitted_value"].iloc[: -self.horizon] = ( - fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon - ) + output_i.loc[output_i.index[ + : -self.horizon], "fitted_value"] = fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon elif (output_i.shape[0] - self.horizon) > len(fit_val): logger.debug( f"Fitted Values were only generated on a subset ({len(fit_val)}/{(output_i.shape[0] - self.horizon)}) of the data for Series: {series_id}." ) - start_idx = output_i.shape[0] - self.horizon - len(fit_val) - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val else: - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val[ - -(output_i.shape[0] - self.horizon) : + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val[ + -(output_i.shape[0] - self.horizon): ] if len(forecast_val) != self.horizon: @@ -365,21 +364,21 @@ def populate_series_output( f"Attempting to set forecast along horizon ({self.horizon}) for series: {series_id}, however forecast is only length {len(forecast_val)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i["forecast_value"].iloc[-self.horizon :] = forecast_val + output_i.loc[output_i.index[-self.horizon:], "forecast_value"] = forecast_val if len(upper_bound) != self.horizon: raise ValueError( f"Attempting to set upper_bound along horizon ({self.horizon}) for series: {series_id}, however upper_bound is only length {len(upper_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.upper_bound_name].iloc[-self.horizon :] = upper_bound + output_i.loc[output_i.index[-self.horizon:], self.upper_bound_name] = upper_bound if len(lower_bound) != self.horizon: raise ValueError( f"Attempting to set lower_bound along horizon ({self.horizon}) for series: {series_id}, however lower_bound is only length {len(lower_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.lower_bound_name].iloc[-self.horizon :] = lower_bound + output_i.loc[output_i.index[-self.horizon:], self.lower_bound_name] = lower_bound self.series_id_map[series_id] = output_i self.verify_series_output(series_id) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py new file mode 100644 index 000000000..57546dd85 --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -0,0 +1,442 @@ +#!/usr/bin/env python + +import logging +import traceback +from typing import Dict, Any + +import numpy as np +import optuna +import pandas as pd +from joblib import Parallel, delayed +from sktime.forecasting.base import ForecastingHorizon +from sktime.forecasting.theta import ThetaForecaster +from sktime.performance_metrics.forecasting import mean_squared_error, \ + mean_absolute_percentage_error +from sktime.split import ExpandingWindowSplitter + +from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) +from .base_model import ForecastOperatorBaseModel +from .forecast_datasets import ForecastDatasets, ForecastOutput +from ..const import ( + SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, +) + +logging.getLogger("report_creator").setLevel(logging.WARNING) + + +def freq_to_sp(freq: str) -> int | None: + """ + Convert pandas freq string to seasonal period (sp). + """ + if not freq: + return None + + freq = freq.upper() + + # Direct mappings + mapping = { + "M": 12, + "Q": 4, + "A": 1, + "Y": 1, + "W": 52, + "D": 7, + "H": 24, + "T": 1440, + "MIN": 1440, + } + if freq in mapping: + return mapping[freq] + + # Weekly variants (W-MON, W-SUN, etc.) + if freq.startswith("W"): + return 52 + + # Minute frequencies like "5T" or "15MIN" + if freq.endswith("T"): + try: + return 1440 // int(freq[:-1]) + except ValueError: + pass + + if freq.endswith("MIN"): # e.g., "15MIN" + try: + return 1440 // int(freq[:-3]) + except ValueError: + pass + + logger.warning("Unable to infer data frequency and sp") + return None + + +class ThetaOperatorModel(ForecastOperatorBaseModel): + """Theta operator model""" + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + self.global_explanation = {} + self.local_explanation = {} + + def set_kwargs(self): + """Prepare kwargs for Theta model from spec. + The operator's 'model_kwargs' is respected. + """ + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) + model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) + model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) + model_kwargs["deseasonalize_model"] = self.spec.model_kwargs.get("deseasonalize_model", "additive") + model_kwargs["sp"] = self.spec.model_kwargs.get("sp", None) + + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] + + model_kwargs["interval_width"] = self.spec.confidence_interval_width + return model_kwargs + + def preprocess(self, data, series_id): + self.le[series_id], df_encoded = _label_encode_dataframe( + data, + no_encode={self.spec.datetime_column.name, self.original_target_column}, + ) + return df_encoded.set_index(self.spec.datetime_column.name) + + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): + try: + self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) + data = self.preprocess(df, series_id) + + data_i = self.drop_horizon(data) + target = self.spec.target_column + + freq = pd.infer_freq(data_i.index) + if freq.startswith("W-"): + freq = "W" + data_i = data_i.asfreq(freq) + y = data_i[target] + if model_kwargs["sp"] is None: + inferred_sp = freq_to_sp(freq) + model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp + + # If model already loaded, extract parameters (best-effort) + if self.loaded_models is not None and series_id in self.loaded_models: + previous_res = self.loaded_models[series_id].get("model") + fitted_params = previous_res.get_fitted_params() + model_kwargs["deseasonalize_model"] = previous_res.deseasonalize_model + model_kwargs["sp"] = previous_res.sp + model_kwargs["deseasonalize"] = previous_res.deseasonalize + model_kwargs["initial_level"] = fitted_params.get("initial_level", None) + else: + if self.perform_tuning: + model_kwargs = self.run_tuning(y, model_kwargs) + if len(y) < 2 * model_kwargs["sp"]: + model_kwargs["deseasonalize"] = False + + # Fit ThetaModel using params + model = ThetaForecaster(initial_level=model_kwargs["initial_level"], + deseasonalize=model_kwargs["deseasonalize"], + deseasonalize_model=model_kwargs["deseasonalize_model"], sp=model_kwargs["sp"]) + model.fit(y) + + fh = ForecastingHorizon(range(1, self.spec.horizon + 1), is_relative=True) + fh_in_sample = ForecastingHorizon(range(-len(data_i) + 1, 1)) + fitted_vals = model.predict(fh_in_sample) + forecast_values = model.predict(fh) + forecast_range = model.predict_interval(fh=fh) + + lower = forecast_range[(self.original_target_column, 0.9, "lower")].rename("yhat_lower") + upper = forecast_range[(self.original_target_column, 0.9, "upper")].rename("yhat_upper") + point = forecast_values.rename("yhat") + forecast = pd.DataFrame( + pd.concat([point, lower, upper], axis=1) + ) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + self.forecast_output.populate_series_output( + series_id=series_id, + fit_val=fitted_vals.values, + forecast_val=forecast["yhat"].values, + upper_bound=forecast["yhat_upper"].values, + lower_bound=forecast["yhat_lower"].values, + ) + self.outputs[series_id] = forecast + self.models[series_id] = {} + self.models[series_id]["model"] = model + self.models[series_id]["le"] = self.le[series_id] + + params = vars(model).copy() + self.model_parameters[series_id] = { + "framework": SupportedModels.Theta, + **params, + } + + logger.debug("===========Done===========") + + except Exception as e: + self.errors_dict[series_id] = { + "model_name": self.spec.model, + "error": str(e), + "error_trace": traceback.format_exc(), + } + logger.error(f"Encountered Error: {e}. Skipping.") + logger.error(traceback.format_exc()) + + def _build_model(self) -> pd.DataFrame: + """Build models for all series in parallel and return forecast long format.""" + full_data_dict = self.datasets.get_data_by_series() + self.models = {} + self.outputs = {} + self.explanations_info = {} + model_kwargs = self.set_kwargs() + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width, + horizon=self.spec.horizon, + target_column=self.original_target_column, + dt_column=self.spec.datetime_column.name, + ) + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ThetaOperatorModel._train_model)( + self, i, series_id, df, model_kwargs.copy() + ) + for self, (i, (series_id, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) + + return self.forecast_output.get_forecast_long() + + def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): + + scoring = { + "mape": lambda y_true, y_pred: mean_absolute_percentage_error(y_true, y_pred), + "rmse": lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)), + "mse": lambda y_true, y_pred: mean_squared_error(y_true, y_pred), + "smape": lambda y_true, y_pred: smape(y_true, y_pred) + } + score_fn = scoring.get(self.spec.metric.lower(), scoring["mape"]) + + def objective(trial): + initial_level = model_kwargs_i["initial_level"] + sp = model_kwargs_i["sp"] + deseasonalize = trial.suggest_categorical("deseasonalize", [True, False]) + deseasonalize_model = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) + if deseasonalize_model == "multiplicative" and (y <= 0).any(): + raise optuna.exceptions.TrialPruned() + + model = ThetaForecaster( + initial_level=initial_level, + sp=sp, + deseasonalize_model=deseasonalize_model, + deseasonalize=deseasonalize, + ) + + cv = ExpandingWindowSplitter( + initial_window=50, + step_length=100 + ) + + scores = [] + + for train, test in cv.split(y): + t_data = y.iloc[train] + if t_data.isna().any(): + continue + if len(t_data) < 2 * sp: + continue + + model.fit(t_data) + fh = ForecastingHorizon(y.index[test], is_relative=False) + y_pred = model.predict(fh) + y_test = y.iloc[test] + if y_test.isna().any(): + continue + scores.append(score_fn(y_test, y_pred)) + return np.mean(scores) + + study = optuna.create_study(direction="minimize") + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + study.optimize(objective, n_trials=trials) + model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] + model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] + return model_kwargs_i + + def _generate_report(self): + import report_creator as rc + """The method that needs to be implemented on the particular model level.""" + all_sections = [] + theta_blocks = [] + + for series_id, sm in self.models.items(): + model = sm["model"] + + # ---- Extract details from ThetaModel ---- + fitted_params = model.get_fitted_params() + initial_level = fitted_params.get("initial_level", None) + smoothing_level = fitted_params.get("smoothing_level", None) + sp = model.sp + deseasonalize_model = model.deseasonalize_model + desasonalized = model.deseasonalize + n_obs = len(model._y) if hasattr(model, "_y") else "N/A" + + # Date range + if hasattr(model, "_y"): + start_date = model._y.index[0] + end_date = model._y.index[-1] + else: + start_date = "" + end_date = "" + + # ---- Build the DF ---- + meta_df = pd.DataFrame({ + "Metric": [ + "Initial Level", + "Smoothing Level / Alpha", + "No. Observations", + "Deseasonalized", + "Deseasonalization Method", + "Period (sp)", + "Sample Start", + "Sample End", + ], + "Value": [ + initial_level, + smoothing_level, + n_obs, + str(desasonalized is not None), + deseasonalize_model, + sp, + start_date, + end_date, + ], + }) + + # ---- Create a block (NOT a section directly) ---- + theta_block = rc.Block( + rc.Heading(f"Theta Model Summary", level=3), + rc.DataTable(meta_df), + label=series_id + ) + + # Add with optional label support + theta_blocks.append( + theta_block + ) + + # ---- Combine into final section like ARIMA example ---- + theta_title = rc.Heading("Theta Model Parameters", level=2) + + if len(theta_blocks) > 1: + theta_section = rc.Select(blocks=theta_blocks) + else: + theta_section = theta_blocks[0] + + all_sections.extend([theta_title, theta_section]) + + if self.spec.generate_explanations: + try: + # If the key is present, call the "explain_model" method + self.explain_model() + + # Convert the global explanation data to a DataFrame + global_explanation_df = pd.DataFrame(self.global_explanation) + + self.formatted_global_explanation = ( + global_explanation_df / global_explanation_df.sum(axis=0) * 100 + ) + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, + axis=1, + ) + ) + aggregate_local_explanations = pd.DataFrame() + for s_id, local_ex_df in self.local_explanation.items(): + local_ex_df_copy = local_ex_df.copy() + local_ex_df_copy["Series"] = s_id + aggregate_local_explanations = pd.concat( + [aggregate_local_explanations, local_ex_df_copy], axis=0 + ) + self.formatted_local_explanation = aggregate_local_explanations + + if not self.target_cat_col: + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True + ) + + # Create a markdown section for the global explainability + global_explanation_section = rc.Block( + rc.Heading("Global Explanation of Models", level=2), + rc.Text( + "The following tables provide the feature attribution for the global explainability." + ), + rc.DataTable(self.formatted_global_explanation, index=True), + ) + + blocks = [ + rc.DataTable( + local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, + label=s_id if self.target_cat_col else None, + index=True, + ) + for s_id, local_ex_df in self.local_explanation.items() + ] + local_explanation_section = rc.Block( + rc.Heading("Local Explanation of Models", level=2), + rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], + ) + + # Append the global explanation text and section to the "all_sections" list + all_sections = all_sections + [ + global_explanation_section, + local_explanation_section, + ] + except Exception as e: + logger.warning(f"Failed to generate Explanations with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") + + model_description = rc.Text( + "A Theta forecaster is a popular and surprisingly effective time series forecasting" + "method that works by decomposing data into long-term trend and short-term components, forecasting them separately," + "and then combining the results, often outperforming complex models by adjusting the original series' local" + "curvature using a parameter called theta (θ). It's known for its simplicity, speed, and strong performance, " + "especially in forecasting competitions like the M3, where it served as a strong benchmark, often by using" + "Simple Exponential Smoothing (SES) with drift on a modified series" + ) + other_sections = all_sections + + return ( + model_description, + other_sections, + ) + + def get_explain_predict_fn(self, series_id): + def _custom_predict( + data, + model=self.models[series_id]["model"], + dt_column_name=self.datasets._datetime_column_name, + target_col=self.original_target_column, + ): + """ + data: ForecastDatasets.get_data_at_series(s_id) + """ + data = data.drop([target_col], axis=1) + data[dt_column_name] = seconds_to_datetime( + data[dt_column_name], dt_format=self.spec.datetime_column.format + ) + data = self.preprocess(data, series_id) + h = len(data) + fh = ForecastingHorizon(np.arange(1, h + 1), is_relative=True) + return model.predict(fh) + + return _custom_predict diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index fe7c90df5..456bd5385 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -460,6 +460,7 @@ spec: - autots - auto-select - auto-select-series + - theta model_kwargs: type: dict diff --git a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst index dc0ee92de..8f73a9e8b 100644 --- a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst +++ b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - string - No - prophet - - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select. + - Model to use. Options: prophet, arima, neuralprophet, theta, automlx, autots, auto-select. * - model_kwargs - dict @@ -266,7 +266,7 @@ Further Description * **format**: (Optional) Specify the format for output data (e.g., ``csv``, ``json``, ``excel``). * **options**: (Optional) Include any additional arguments, such as connection parameters for storage. - * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``autots``, and ``auto-select``. + * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``theta``, ``autots``, and ``auto-select``. * **model_kwargs**: (Optional) A dictionary of arguments to pass directly to the model framework, allowing for detailed control over modeling. diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index 8460bbea7..d4782a8c6 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -32,6 +32,7 @@ "prophet", "neuralprophet", "autots", + "theta", # "lgbforecast", "auto-select", "auto-select-series", diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index b102d7127..1b5632d13 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -142,6 +142,7 @@ "automlx", "prophet", "neuralprophet", + "theta", "autots", # "lgbforecast", ] diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index 753e324f4..e5e8b20d6 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -20,6 +20,7 @@ # "automlx", # FIXME: automlx is failing, no errors "prophet", "neuralprophet", + "theta", "auto-select-series", ]