From c068dd82981bb5cbedd2be53fa7528a4810fb0f8 Mon Sep 17 00:00:00 2001 From: sjangir Date: Mon, 8 Dec 2025 17:56:44 +0530 Subject: [PATCH 1/9] added theta forecasting model --- ads/model/model_metadata.py | 1 + ads/opctl/operator/lowcode/forecast/const.py | 1 + .../lowcode/forecast/model/factory.py | 2 + .../operator/lowcode/forecast/model/theta.py | 390 ++++++++++++++++++ .../operator/lowcode/forecast/schema.yaml | 1 + 5 files changed, 395 insertions(+) create mode 100644 ads/opctl/operator/lowcode/forecast/model/theta.py diff --git a/ads/model/model_metadata.py b/ads/model/model_metadata.py index f0428ec9c..995aba0bd 100644 --- a/ads/model/model_metadata.py +++ b/ads/model/model_metadata.py @@ -165,6 +165,7 @@ class Framework(ExtendedEnum): PYOD = "pyod" SPACY = "spacy" PROPHET = "prophet" + THETA = "theta" SKTIME = "sktime" STATSMODELS = "statsmodels" CUML = "cuml" diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index f2265418a..a6e10e3e0 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -16,6 +16,7 @@ class SupportedModels(ExtendedEnum): LGBForecast = "lgbforecast" AutoMLX = "automlx" AutoTS = "autots" + Theta = "theta" # Auto = "auto" diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index 262fe5bbc..1aa4f99b6 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -4,6 +4,7 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ from ads.opctl.operator.lowcode.common.transformations import Transformations +from .theta import ThetaOperatorModel from ..const import ( AUTO_SELECT, @@ -46,6 +47,7 @@ class ForecastOperatorModelFactory: SupportedModels.LGBForecast: MLForecastOperatorModel, SupportedModels.AutoMLX: AutoMLXOperatorModel, SupportedModels.AutoTS: AutoTSOperatorModel, + SupportedModels.Theta: ThetaOperatorModel, } @classmethod diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py new file mode 100644 index 000000000..50e9c1db8 --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python + +import logging +import traceback +from typing import Dict, Any + +import numpy as np +import pandas as pd +import optuna +from joblib import Parallel, delayed +from sktime.forecasting.base import ForecastingHorizon +from sktime.forecasting.theta import ThetaForecaster +from sktime.performance_metrics.forecasting import mean_absolute_error, mean_squared_error, \ + mean_absolute_percentage_error +from sktime.split import ExpandingWindowSplitter + +from ads.opctl import logger +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe) + +from ..const import ( + SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, +) +from .base_model import ForecastOperatorBaseModel +from .forecast_datasets import ForecastDatasets, ForecastOutput + +logging.getLogger("report_creator").setLevel(logging.WARNING) + + +def freq_to_sp(freq: str) -> int | None: + """ + Convert pandas freq string to seasonal period (sp). + """ + freq = freq.upper() + + if freq == "M": # Monthly + return 12 + if freq == "Q": # Quarterly + return 4 + if freq == "A" or freq == "Y": # Annual + return 1 # Usually no seasonality + + # Weekly data + if freq == "D": # Daily + return 7 # 7-day seasonality is common + + if freq == "H": # Hourly + return 24 # 24 hours/day + + if freq == "T" or freq == "MIN": # Minute data + return 1440 # minutes/day → auto infer later + + # If freq is something like "5T" + if freq.endswith("T"): + minutes = int(freq.replace("T", "")) + return int(1440 / minutes) + + # If freq is something like "15min" + if "MIN" in freq: + minutes = int(freq.replace("MIN", "")) + return int(1440 / minutes) + + logger.warning("Unable to infer data frequency and sp") + return None + +class ThetaOperatorModel(ForecastOperatorBaseModel): + """Theta operator model""" + + def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): + super().__init__(config=config, datasets=datasets) + self.global_explanation = {} + self.local_explanation = {} + + def set_kwargs(self): + """Prepare kwargs for Theta model from spec. + The operator's 'model_kwargs' is respected. + """ + model_kwargs = self.spec.model_kwargs + model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) + model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) + + if self.spec.confidence_interval_width is None: + self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] + + model_kwargs["interval_width"] = self.spec.confidence_interval_width + return model_kwargs + + def preprocess(self, data, series_id): + self.le[series_id], df_encoded = _label_encode_dataframe( + data, + no_encode={self.spec.datetime_column.name, self.original_target_column}, + ) + return df_encoded.set_index(self.spec.datetime_column.name) + + def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, Any]): + try: + self.forecast_output.init_series_output(series_id=series_id, data_at_series=df) + data = self.preprocess(df, series_id) + + data_i = self.drop_horizon(data) + target = self.spec.target_column + freq = pd.infer_freq(data_i.index) + data_i = data_i.asfreq(freq) + y = data_i[target] + + model_kwargs["deseasonalize_model"] = "additive" + inferred_sp = freq_to_sp(freq) + model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp + + # If model already loaded, extract parameters (best-effort) + if self.loaded_models is not None and series_id in self.loaded_models: + previous_res = self.loaded_models[series_id].get("model") + fitted_params = previous_res.get_fitted_params() + model_kwargs["deseasonalize_model"] = previous_res.deseasonalize_model + model_kwargs["sp"] = previous_res.sp + model_kwargs["initial_level"] = fitted_params.get("initial_level", None) + else: + if self.perform_tuning: + model_kwargs = self.run_tuning(y, model_kwargs) + + # Fit ThetaModel using params + model = ThetaForecaster(initial_level=model_kwargs["initial_level"], + deseasonalize_model=model_kwargs["deseasonalize_model"], sp=model_kwargs["sp"]) + model.fit(y) + + fh = ForecastingHorizon(range(1, self.spec.horizon + 1), is_relative=True) + fh_in_sample = ForecastingHorizon(range(-len(data_i) + 1, 1)) + fitted_vals = model.predict(fh_in_sample) + forecast_values = model.predict(fh) + forecast_range = model.predict_interval(fh=fh) + + lower = forecast_range[("y", 0.9, "lower")].rename("yhat_lower") + upper = forecast_range[("y", 0.9, "upper")].rename("yhat_upper") + point = forecast_values.rename("yhat") + forecast = pd.DataFrame( + pd.concat([point, lower, upper], axis=1) + ) + logger.debug(f"-----------------Model {i}----------------------") + logger.debug(forecast[["yhat", "yhat_lower", "yhat_upper"]].tail()) + + self.forecast_output.populate_series_output( + series_id=series_id, + fit_val=fitted_vals.values, + forecast_val=forecast["yhat"].values, + upper_bound=forecast["yhat_upper"].values, + lower_bound=forecast["yhat_lower"].values, + ) + self.outputs[series_id] = forecast + self.models[series_id] = {} + self.models[series_id]["model"] = model + self.models[series_id]["le"] = self.le[series_id] + + params = vars(model).copy() + self.model_parameters[series_id] = { + "framework": SupportedModels.Theta, + **params, + } + + logger.debug("===========Done===========") + + except Exception as e: + self.errors_dict[series_id] = { + "model_name": self.spec.model, + "error": str(e), + "error_trace": traceback.format_exc(), + } + logger.error(f"Encountered Error: {e}. Skipping.") + logger.error(traceback.format_exc()) + + def _build_model(self) -> pd.DataFrame: + """Build models for all series in parallel and return forecast long format.""" + full_data_dict = self.datasets.get_data_by_series() + self.models = {} + self.outputs = {} + self.explanations_info = {} + model_kwargs = self.set_kwargs() + self.forecast_output = ForecastOutput( + confidence_interval_width=self.spec.confidence_interval_width, + horizon=self.spec.horizon, + target_column=self.original_target_column, + dt_column=self.spec.datetime_column.name, + ) + + Parallel(n_jobs=-1, require="sharedmem")( + delayed(ThetaOperatorModel._train_model)( + self, i, series_id, df, model_kwargs.copy() + ) + for self, (i, (series_id, df)) in zip( + [self] * len(full_data_dict), enumerate(full_data_dict.items()) + ) + ) + + return self.forecast_output.get_forecast_long() + + def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): + + def objective(trial): + # Hyperparameters to tune + initial_level = trial.suggest_float("initial_level", 0.05, 1.0) + sp = trial.suggest_categorical("sp", model_kwargs_i["sp"]) + deseason = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) + if deseason == "multiplicative" and (y <= 0).any(): + raise optuna.exceptions.TrialPruned() + + model = ThetaForecaster( + initial_level=initial_level, + sp=sp, + deseasonalize_model=deseason + ) + + cv = ExpandingWindowSplitter( + initial_window=50, + step_length=100 + ) + + scores = [] + for train, test in cv.split(y): + t_data = y.iloc[train] + if len(t_data) < 2 * sp: + continue + model.fit(t_data) + fh = ForecastingHorizon(y.index[test], is_relative=False) + y_pred = model.predict(fh) + + score = mean_absolute_percentage_error( + y.iloc[test], y_pred + ) + scores.append(score) + + return np.mean(scores) + + study = optuna.create_study(direction="minimize") + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + study.optimize(objective, n_trials=trials) + return study.best_params + + def _generate_report(self): + import report_creator as rc + """The method that needs to be implemented on the particular model level.""" + all_sections = [] + for series_id, sm in self.models.items(): + model = sm["model"] + # ---- Extract details from ThetaModel ---- + fitted_params = model.get_fitted_params() + alpha = fitted_params.get("initial_level", None) + sp = model.sp + deseasonalize_model = model.deseasonalize_model + desasonalized = model.deseasonalize + smoothing_level = fitted_params.get("smoothing_level", None) + n_obs = len(model._y) if hasattr(model, "_y") else "N/A" + + # Date range + if hasattr(model, "_y"): + start_date = model._y.index[0] + end_date = model._y.index[-1] + else: + start_date = "" + end_date = "" + + # ---- Build the text block ---- + meta_df = pd.DataFrame({ + "Metric": [ + "Alpha / Initial Level", + "Smoothing Level", + "No. Observations", + "Deseasonalized", + "Deseasonalization Method", + "Period (sp)", + "Sample Start", + "Sample End" + ], + "Value": [ + alpha, + smoothing_level, + n_obs, + str(desasonalized is not None), + deseasonalize_model, + sp, + start_date, + end_date + ] + }) + + # ---- Add to Report Creator ---- + theta_section = rc.Block( + rc.Heading(f"Theta Model Summary — {series_id}", level=2), + rc.Text("This section provides detailed ThetaModel fit diagnostics."), + rc.DataTable(meta_df), + ) + + all_sections.append(theta_section) + + if self.spec.generate_explanations: + try: + # If the key is present, call the "explain_model" method + self.explain_model() + + # Convert the global explanation data to a DataFrame + global_explanation_df = pd.DataFrame(self.global_explanation) + + self.formatted_global_explanation = ( + global_explanation_df / global_explanation_df.sum(axis=0) * 100 + ) + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {self.spec.datetime_column.name: ForecastOutputColumns.DATE}, + axis=1, + ) + ) + aggregate_local_explanations = pd.DataFrame() + for s_id, local_ex_df in self.local_explanation.items(): + local_ex_df_copy = local_ex_df.copy() + local_ex_df_copy["Series"] = s_id + aggregate_local_explanations = pd.concat( + [aggregate_local_explanations, local_ex_df_copy], axis=0 + ) + self.formatted_local_explanation = aggregate_local_explanations + + if not self.target_cat_col: + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True + ) + + # Create a markdown section for the global explainability + global_explanation_section = rc.Block( + rc.Heading("Global Explanation of Models", level=2), + rc.Text( + "The following tables provide the feature attribution for the global explainability." + ), + rc.DataTable(self.formatted_global_explanation, index=True), + ) + + blocks = [ + rc.DataTable( + local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, + label=s_id if self.target_cat_col else None, + index=True, + ) + for s_id, local_ex_df in self.local_explanation.items() + ] + local_explanation_section = rc.Block( + rc.Heading("Local Explanation of Models", level=2), + rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0], + ) + + # Append the global explanation text and section to the "all_sections" list + all_sections = all_sections + [ + global_explanation_section, + local_explanation_section, + ] + except Exception as e: + logger.warning(f"Failed to generate Explanations with error: {e}.") + logger.debug(f"Full Traceback: {traceback.format_exc()}") + + model_description = rc.Text( + "A Theta forecaster is a popular and surprisingly effective time series forecasting" + "method that works by decomposing data into long-term trend and short-term components, forecasting them separately," + "and then combining the results, often outperforming complex models by adjusting the original series' local" + "curvature using a parameter called theta (θ). It's known for its simplicity, speed, and strong performance, " + "especially in forecasting competitions like the M3, where it served as a strong benchmark, often by using" + "Simple Exponential Smoothing (SES) with drift on a modified series" + ) + other_sections = all_sections + + return ( + model_description, + other_sections, + ) + + def get_explain_predict_fn(self, series_id): + def _custom_predict( + data, + model=self.models[series_id]["model"], + target_col=self.original_target_column, + ): + """ + data: ForecastDatasets.get_data_at_series(s_id) + """ + data = data.drop([target_col], axis=1) + data = self.preprocess(data, series_id) + fh = ForecastingHorizon(pd.to_datetime(data.index), is_relative=False) + return model.predict(fh) + + return _custom_predict diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index fe7c90df5..456bd5385 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -460,6 +460,7 @@ spec: - autots - auto-select - auto-select-series + - theta model_kwargs: type: dict From c1f1767e8aa05e29da4bfacf74b03830a543bc7b Mon Sep 17 00:00:00 2001 From: sjangir Date: Tue, 9 Dec 2025 10:58:46 +0530 Subject: [PATCH 2/9] fixed errors to handle different data in Theta forecaster --- ads/opctl/operator/lowcode/forecast/model/theta.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 50e9c1db8..a7469bd40 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -63,6 +63,7 @@ def freq_to_sp(freq: str) -> int | None: logger.warning("Unable to infer data frequency and sp") return None + class ThetaOperatorModel(ForecastOperatorBaseModel): """Theta operator model""" @@ -78,6 +79,7 @@ def set_kwargs(self): model_kwargs = self.spec.model_kwargs model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) + model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) if self.spec.confidence_interval_width is None: self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] @@ -117,9 +119,12 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A else: if self.perform_tuning: model_kwargs = self.run_tuning(y, model_kwargs) + if len(y) < 2 * model_kwargs["sp"]: + model_kwargs["deseasonalize"] = False # Fit ThetaModel using params model = ThetaForecaster(initial_level=model_kwargs["initial_level"], + deseasonalize=model_kwargs["deseasonalize"], deseasonalize_model=model_kwargs["deseasonalize_model"], sp=model_kwargs["sp"]) model.fit(y) @@ -129,8 +134,8 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A forecast_values = model.predict(fh) forecast_range = model.predict_interval(fh=fh) - lower = forecast_range[("y", 0.9, "lower")].rename("yhat_lower") - upper = forecast_range[("y", 0.9, "upper")].rename("yhat_upper") + lower = forecast_range[(self.original_target_column, 0.9, "lower")].rename("yhat_lower") + upper = forecast_range[(self.original_target_column, 0.9, "upper")].rename("yhat_upper") point = forecast_values.rename("yhat") forecast = pd.DataFrame( pd.concat([point, lower, upper], axis=1) @@ -205,7 +210,8 @@ def objective(trial): model = ThetaForecaster( initial_level=initial_level, sp=sp, - deseasonalize_model=deseason + deseasonalize_model=deseason, + deseasonalize=model_kwargs_i["deseasonalize"], ) cv = ExpandingWindowSplitter( From 687902d43730a7d2cce4ff52626cf5e5b7e404c1 Mon Sep 17 00:00:00 2001 From: sjangir Date: Tue, 9 Dec 2025 16:51:17 +0530 Subject: [PATCH 3/9] added tests for theta forecaster --- .../operator/lowcode/forecast/model/theta.py | 48 ++++++++++++++----- tests/operators/forecast/test_datasets.py | 5 +- tests/operators/forecast/test_errors.py | 1 + tests/operators/forecast/test_explainers.py | 1 + 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index a7469bd40..df5b92e31 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -17,6 +17,7 @@ from ads.opctl import logger from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe) +from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime from ..const import ( SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, @@ -39,6 +40,8 @@ def freq_to_sp(freq: str) -> int | None: return 4 if freq == "A" or freq == "Y": # Annual return 1 # Usually no seasonality + if freq.startswith("W"): # Weekly data (W, W-SUN, W-MON, etc.) + return 52 # Weekly data if freq == "D": # Daily @@ -102,6 +105,8 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A data_i = self.drop_horizon(data) target = self.spec.target_column freq = pd.infer_freq(data_i.index) + if freq.startswith("W-"): + freq = "W" data_i = data_i.asfreq(freq) y = data_i[target] @@ -244,15 +249,18 @@ def _generate_report(self): import report_creator as rc """The method that needs to be implemented on the particular model level.""" all_sections = [] + theta_blocks = [] + for series_id, sm in self.models.items(): model = sm["model"] + # ---- Extract details from ThetaModel ---- fitted_params = model.get_fitted_params() alpha = fitted_params.get("initial_level", None) + smoothing_level = fitted_params.get("smoothing_level", None) sp = model.sp deseasonalize_model = model.deseasonalize_model desasonalized = model.deseasonalize - smoothing_level = fitted_params.get("smoothing_level", None) n_obs = len(model._y) if hasattr(model, "_y") else "N/A" # Date range @@ -263,7 +271,7 @@ def _generate_report(self): start_date = "" end_date = "" - # ---- Build the text block ---- + # ---- Build the DF ---- meta_df = pd.DataFrame({ "Metric": [ "Alpha / Initial Level", @@ -273,7 +281,7 @@ def _generate_report(self): "Deseasonalization Method", "Period (sp)", "Sample Start", - "Sample End" + "Sample End", ], "Value": [ alpha, @@ -283,18 +291,31 @@ def _generate_report(self): deseasonalize_model, sp, start_date, - end_date - ] + end_date, + ], }) - # ---- Add to Report Creator ---- - theta_section = rc.Block( - rc.Heading(f"Theta Model Summary — {series_id}", level=2), - rc.Text("This section provides detailed ThetaModel fit diagnostics."), + # ---- Create a block (NOT a section directly) ---- + theta_block = rc.Block( + rc.Heading(f"Theta Model Summary", level=3), rc.DataTable(meta_df), + label=series_id ) - all_sections.append(theta_section) + # Add with optional label support + theta_blocks.append( + theta_block + ) + + # ---- Combine into final section like ARIMA example ---- + theta_title = rc.Heading("Theta Model Parameters", level=2) + + if len(theta_blocks) > 1: + theta_section = rc.Select(blocks=theta_blocks) + else: + theta_section = theta_blocks[0] + + all_sections.extend([theta_title, theta_section]) if self.spec.generate_explanations: try: @@ -383,14 +404,19 @@ def get_explain_predict_fn(self, series_id): def _custom_predict( data, model=self.models[series_id]["model"], + dt_column_name=self.datasets._datetime_column_name, target_col=self.original_target_column, ): """ data: ForecastDatasets.get_data_at_series(s_id) """ data = data.drop([target_col], axis=1) + data[dt_column_name] = seconds_to_datetime( + data[dt_column_name], dt_format=self.spec.datetime_column.format + ) data = self.preprocess(data, series_id) - fh = ForecastingHorizon(pd.to_datetime(data.index), is_relative=False) + h = len(data) + fh = ForecastingHorizon(np.arange(1, h + 1), is_relative=True) return model.predict(fh) return _custom_predict diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index 8460bbea7..4b643b1a8 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -32,6 +32,7 @@ "prophet", "neuralprophet", "autots", + "theta", # "lgbforecast", "auto-select", "auto-select-series", @@ -241,7 +242,7 @@ def test_pandas_to_historical(model): check_output_for_errors(output_data_path) -@pytest.mark.parametrize("model", ["prophet", "neuralprophet"]) +@pytest.mark.parametrize("model", ["prophet", "neuralprophet", "theta"]) def test_pandas_to_historical_test(model): df = pd.read_csv(f"{DATASET_PREFIX}dataset4.csv") df_train = df[:-PERIODS] @@ -268,7 +269,7 @@ def test_pandas_to_historical_test(model): # CostAD -@pytest.mark.parametrize("model", ["prophet", "neuralprophet"]) +@pytest.mark.parametrize("model", ["prophet", "neuralprophet", "theta"]) def test_pandas_to_historical_test2(model): df = pd.read_csv(f"{DATASET_PREFIX}dataset5.csv") df_train = df[:-1] diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index b102d7127..1b5632d13 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -142,6 +142,7 @@ "automlx", "prophet", "neuralprophet", + "theta", "autots", # "lgbforecast", ] diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index 753e324f4..e5e8b20d6 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -20,6 +20,7 @@ # "automlx", # FIXME: automlx is failing, no errors "prophet", "neuralprophet", + "theta", "auto-select-series", ] From 86475d321bafc239a6c811f7a9a217c5798ef7d8 Mon Sep 17 00:00:00 2001 From: sjangir Date: Tue, 9 Dec 2025 17:08:38 +0530 Subject: [PATCH 4/9] added tests for theta forecaster --- tests/operators/forecast/test_datasets.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/operators/forecast/test_datasets.py b/tests/operators/forecast/test_datasets.py index 4b643b1a8..d4782a8c6 100644 --- a/tests/operators/forecast/test_datasets.py +++ b/tests/operators/forecast/test_datasets.py @@ -242,7 +242,7 @@ def test_pandas_to_historical(model): check_output_for_errors(output_data_path) -@pytest.mark.parametrize("model", ["prophet", "neuralprophet", "theta"]) +@pytest.mark.parametrize("model", ["prophet", "neuralprophet"]) def test_pandas_to_historical_test(model): df = pd.read_csv(f"{DATASET_PREFIX}dataset4.csv") df_train = df[:-PERIODS] @@ -269,7 +269,7 @@ def test_pandas_to_historical_test(model): # CostAD -@pytest.mark.parametrize("model", ["prophet", "neuralprophet", "theta"]) +@pytest.mark.parametrize("model", ["prophet", "neuralprophet"]) def test_pandas_to_historical_test2(model): df = pd.read_csv(f"{DATASET_PREFIX}dataset5.csv") df_train = df[:-1] From e89fc969ced40d9ff2c51f7cdda3e9deb5e7e77f Mon Sep 17 00:00:00 2001 From: sjangir Date: Wed, 10 Dec 2025 12:37:35 +0530 Subject: [PATCH 5/9] - fixed warning: ChainedAssignmentError - added theta forcaster to auto-select-series --- .../operator/lowcode/forecast/meta_selector.py | 6 ++++++ .../lowcode/forecast/model/forecast_datasets.py | 17 ++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/meta_selector.py b/ads/opctl/operator/lowcode/forecast/meta_selector.py index 76390b279..1a04251b5 100644 --- a/ads/opctl/operator/lowcode/forecast/meta_selector.py +++ b/ads/opctl/operator/lowcode/forecast/meta_selector.py @@ -214,6 +214,12 @@ def __init__(self): "model": "autots", "priority": 17, }, + # Rule 18: Theta Forecaster + "theta_0": { + "conditions": [], + "model": "theta", + "priority": 18, + }, } def _evaluate_condition(self, value, operator, threshold): diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 3019b6839..21b741a26 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -346,18 +346,17 @@ def populate_series_output( ) from e if (output_i.shape[0] - self.horizon) == len(fit_val): - output_i["fitted_value"].iloc[: -self.horizon] = ( - fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon - ) + output_i.loc[output_i.index[ + : -self.horizon], "fitted_value"] = fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon elif (output_i.shape[0] - self.horizon) > len(fit_val): logger.debug( f"Fitted Values were only generated on a subset ({len(fit_val)}/{(output_i.shape[0] - self.horizon)}) of the data for Series: {series_id}." ) start_idx = output_i.shape[0] - self.horizon - len(fit_val) - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val else: - output_i["fitted_value"].iloc[start_idx : -self.horizon] = fit_val[ - -(output_i.shape[0] - self.horizon) : + output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val[ + -(output_i.shape[0] - self.horizon): ] if len(forecast_val) != self.horizon: @@ -365,21 +364,21 @@ def populate_series_output( f"Attempting to set forecast along horizon ({self.horizon}) for series: {series_id}, however forecast is only length {len(forecast_val)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i["forecast_value"].iloc[-self.horizon :] = forecast_val + output_i.loc[output_i.index[-self.horizon:], "forecast_value"] = forecast_val if len(upper_bound) != self.horizon: raise ValueError( f"Attempting to set upper_bound along horizon ({self.horizon}) for series: {series_id}, however upper_bound is only length {len(upper_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.upper_bound_name].iloc[-self.horizon :] = upper_bound + output_i.loc[output_i.index[-self.horizon:], self.upper_bound_name] = upper_bound if len(lower_bound) != self.horizon: raise ValueError( f"Attempting to set lower_bound along horizon ({self.horizon}) for series: {series_id}, however lower_bound is only length {len(lower_bound)}" f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) - output_i[self.lower_bound_name].iloc[-self.horizon :] = lower_bound + output_i.loc[output_i.index[-self.horizon:], self.lower_bound_name] = lower_bound self.series_id_map[series_id] = output_i self.verify_series_output(series_id) From de9830fb0ae591cfada957d2cbd2092adee0ac57 Mon Sep 17 00:00:00 2001 From: sjangir Date: Wed, 10 Dec 2025 12:42:49 +0530 Subject: [PATCH 6/9] bug fix --- ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 21b741a26..0c234b906 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -345,6 +345,7 @@ def populate_series_output( f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps." ) from e + start_idx = output_i.shape[0] - self.horizon - len(fit_val) if (output_i.shape[0] - self.horizon) == len(fit_val): output_i.loc[output_i.index[ : -self.horizon], "fitted_value"] = fit_val # Note: may need to do len(output_i) - (len(fit_val) + horizon) : -horizon @@ -352,7 +353,6 @@ def populate_series_output( logger.debug( f"Fitted Values were only generated on a subset ({len(fit_val)}/{(output_i.shape[0] - self.horizon)}) of the data for Series: {series_id}." ) - start_idx = output_i.shape[0] - self.horizon - len(fit_val) output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val else: output_i.loc[output_i.index[start_idx: -self.horizon], "fitted_value"] = fit_val[ From c10dd771c45b045651ec38bb22364bfa34d22796 Mon Sep 17 00:00:00 2001 From: sjangir Date: Wed, 10 Dec 2025 18:47:18 +0530 Subject: [PATCH 7/9] fixed tuning and take params from config --- .../operator/lowcode/forecast/model/theta.py | 92 ++++++++++++------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index df5b92e31..9cb94530b 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -16,11 +16,11 @@ from ads.opctl import logger from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig -from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe) +from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime from ..const import ( - SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, + SupportedModels, ForecastOutputColumns, ) from .base_model import ForecastOperatorBaseModel from .forecast_datasets import ForecastDatasets, ForecastOutput @@ -32,36 +32,42 @@ def freq_to_sp(freq: str) -> int | None: """ Convert pandas freq string to seasonal period (sp). """ + if not freq: + return None + freq = freq.upper() - if freq == "M": # Monthly - return 12 - if freq == "Q": # Quarterly - return 4 - if freq == "A" or freq == "Y": # Annual - return 1 # Usually no seasonality - if freq.startswith("W"): # Weekly data (W, W-SUN, W-MON, etc.) + # Direct mappings + mapping = { + "M": 12, + "Q": 4, + "A": 1, + "Y": 1, + "W": 52, + "D": 7, + "H": 24, + "T": 1440, + "MIN": 1440, + } + if freq in mapping: + return mapping[freq] + + # Weekly variants (W-MON, W-SUN, etc.) + if freq.startswith("W"): return 52 - # Weekly data - if freq == "D": # Daily - return 7 # 7-day seasonality is common - - if freq == "H": # Hourly - return 24 # 24 hours/day - - if freq == "T" or freq == "MIN": # Minute data - return 1440 # minutes/day → auto infer later - - # If freq is something like "5T" + # Minute frequencies like "5T" or "15MIN" if freq.endswith("T"): - minutes = int(freq.replace("T", "")) - return int(1440 / minutes) + try: + return 1440 // int(freq[:-1]) + except ValueError: + pass - # If freq is something like "15min" - if "MIN" in freq: - minutes = int(freq.replace("MIN", "")) - return int(1440 / minutes) + if freq.endswith("MIN"): # e.g., "15MIN" + try: + return 1440 // int(freq[:-3]) + except ValueError: + pass logger.warning("Unable to infer data frequency and sp") return None @@ -83,6 +89,7 @@ def set_kwargs(self): model_kwargs["alpha"] = self.spec.model_kwargs.get("alpha", None) model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) + model_kwargs["deseasonalize_model"] = self.spec.model_kwargs.get("deseasonalize_model", "additive") if self.spec.confidence_interval_width is None: self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] @@ -110,7 +117,6 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A data_i = data_i.asfreq(freq) y = data_i[target] - model_kwargs["deseasonalize_model"] = "additive" inferred_sp = freq_to_sp(freq) model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp @@ -205,10 +211,10 @@ def _build_model(self) -> pd.DataFrame: def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): def objective(trial): - # Hyperparameters to tune - initial_level = trial.suggest_float("initial_level", 0.05, 1.0) - sp = trial.suggest_categorical("sp", model_kwargs_i["sp"]) + initial_level = model_kwargs_i["initial_level"] + sp = model_kwargs_i["sp"] deseason = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) + if deseason == "multiplicative" and (y <= 0).any(): raise optuna.exceptions.TrialPruned() @@ -225,25 +231,41 @@ def objective(trial): ) scores = [] + for train, test in cv.split(y): t_data = y.iloc[train] + if t_data.isna().any(): + continue if len(t_data) < 2 * sp: continue + model.fit(t_data) fh = ForecastingHorizon(y.index[test], is_relative=False) y_pred = model.predict(fh) + y_test = y.iloc[test] + if y_test.isna().any(): + continue + + if self.spec.metric == "mape": + score = mean_absolute_percentage_error(y_test, y_pred) + elif self.spec.metric == "rmse": + score = np.sqrt(mean_squared_error(y_test, y_pred)) + elif self.spec.metric == "mse": + score = mean_squared_error(y_test, y_pred) + elif self.spec.metric == "smape": + score = smape(y_test.values, y_pred.values) + else: + score = mean_absolute_percentage_error(y_test, y_pred) - score = mean_absolute_percentage_error( - y.iloc[test], y_pred - ) scores.append(score) return np.mean(scores) study = optuna.create_study(direction="minimize") - trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + trials = 2 if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials study.optimize(objective, n_trials=trials) - return study.best_params + model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] + return model_kwargs_i def _generate_report(self): import report_creator as rc From e2be97244af5da0b6a291e8cdbbe1ad392d33b83 Mon Sep 17 00:00:00 2001 From: sjangir Date: Wed, 10 Dec 2025 18:48:23 +0530 Subject: [PATCH 8/9] removed meta selector --- ads/opctl/operator/lowcode/forecast/meta_selector.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/meta_selector.py b/ads/opctl/operator/lowcode/forecast/meta_selector.py index 1a04251b5..76390b279 100644 --- a/ads/opctl/operator/lowcode/forecast/meta_selector.py +++ b/ads/opctl/operator/lowcode/forecast/meta_selector.py @@ -214,12 +214,6 @@ def __init__(self): "model": "autots", "priority": 17, }, - # Rule 18: Theta Forecaster - "theta_0": { - "conditions": [], - "model": "theta", - "priority": 18, - }, } def _evaluate_condition(self, value, operator, threshold): From 2f1c2705ea5d6457d5709f99238b4a2044e7ea5d Mon Sep 17 00:00:00 2001 From: sjangir Date: Thu, 11 Dec 2025 13:30:15 +0530 Subject: [PATCH 9/9] added theta forecastor to docs resolved review points --- .../operator/lowcode/forecast/model/theta.py | 66 +++++++++---------- .../forecast_operator/yaml_schema.rst | 4 +- 2 files changed, 34 insertions(+), 36 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/theta.py b/ads/opctl/operator/lowcode/forecast/model/theta.py index 9cb94530b..57546dd85 100644 --- a/ads/opctl/operator/lowcode/forecast/model/theta.py +++ b/ads/opctl/operator/lowcode/forecast/model/theta.py @@ -5,25 +5,24 @@ from typing import Dict, Any import numpy as np -import pandas as pd import optuna +import pandas as pd from joblib import Parallel, delayed from sktime.forecasting.base import ForecastingHorizon from sktime.forecasting.theta import ThetaForecaster -from sktime.performance_metrics.forecasting import mean_absolute_error, mean_squared_error, \ +from sktime.performance_metrics.forecasting import mean_squared_error, \ mean_absolute_percentage_error from sktime.split import ExpandingWindowSplitter from ads.opctl import logger +from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig from ads.opctl.operator.lowcode.forecast.utils import (_label_encode_dataframe, smape) -from ads.opctl.operator.lowcode.common.utils import seconds_to_datetime - -from ..const import ( - SupportedModels, ForecastOutputColumns, -) from .base_model import ForecastOperatorBaseModel from .forecast_datasets import ForecastDatasets, ForecastOutput +from ..const import ( + SupportedModels, ForecastOutputColumns, DEFAULT_TRIALS, +) logging.getLogger("report_creator").setLevel(logging.WARNING) @@ -90,6 +89,7 @@ def set_kwargs(self): model_kwargs["initial_level"] = self.spec.model_kwargs.get("initial_level", None) model_kwargs["deseasonalize"] = self.spec.model_kwargs.get("deseasonalize", True) model_kwargs["deseasonalize_model"] = self.spec.model_kwargs.get("deseasonalize_model", "additive") + model_kwargs["sp"] = self.spec.model_kwargs.get("sp", None) if self.spec.confidence_interval_width is None: self.spec.confidence_interval_width = 1 - 0.90 if model_kwargs["alpha"] is None else model_kwargs["alpha"] @@ -111,14 +111,15 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A data_i = self.drop_horizon(data) target = self.spec.target_column + freq = pd.infer_freq(data_i.index) if freq.startswith("W-"): freq = "W" data_i = data_i.asfreq(freq) y = data_i[target] - - inferred_sp = freq_to_sp(freq) - model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp + if model_kwargs["sp"] is None: + inferred_sp = freq_to_sp(freq) + model_kwargs["sp"] = 1 if inferred_sp is None else inferred_sp # If model already loaded, extract parameters (best-effort) if self.loaded_models is not None and series_id in self.loaded_models: @@ -126,6 +127,7 @@ def _train_model(self, i, series_id, df: pd.DataFrame, model_kwargs: Dict[str, A fitted_params = previous_res.get_fitted_params() model_kwargs["deseasonalize_model"] = previous_res.deseasonalize_model model_kwargs["sp"] = previous_res.sp + model_kwargs["deseasonalize"] = previous_res.deseasonalize model_kwargs["initial_level"] = fitted_params.get("initial_level", None) else: if self.perform_tuning: @@ -210,19 +212,27 @@ def _build_model(self) -> pd.DataFrame: def run_tuning(self, y: pd.DataFrame, model_kwargs_i: Dict[str, Any]): + scoring = { + "mape": lambda y_true, y_pred: mean_absolute_percentage_error(y_true, y_pred), + "rmse": lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)), + "mse": lambda y_true, y_pred: mean_squared_error(y_true, y_pred), + "smape": lambda y_true, y_pred: smape(y_true, y_pred) + } + score_fn = scoring.get(self.spec.metric.lower(), scoring["mape"]) + def objective(trial): initial_level = model_kwargs_i["initial_level"] sp = model_kwargs_i["sp"] - deseason = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) - - if deseason == "multiplicative" and (y <= 0).any(): + deseasonalize = trial.suggest_categorical("deseasonalize", [True, False]) + deseasonalize_model = trial.suggest_categorical("deseasonalize_model", ["additive", "multiplicative"]) + if deseasonalize_model == "multiplicative" and (y <= 0).any(): raise optuna.exceptions.TrialPruned() model = ThetaForecaster( initial_level=initial_level, sp=sp, - deseasonalize_model=deseason, - deseasonalize=model_kwargs_i["deseasonalize"], + deseasonalize_model=deseasonalize_model, + deseasonalize=deseasonalize, ) cv = ExpandingWindowSplitter( @@ -245,26 +255,14 @@ def objective(trial): y_test = y.iloc[test] if y_test.isna().any(): continue - - if self.spec.metric == "mape": - score = mean_absolute_percentage_error(y_test, y_pred) - elif self.spec.metric == "rmse": - score = np.sqrt(mean_squared_error(y_test, y_pred)) - elif self.spec.metric == "mse": - score = mean_squared_error(y_test, y_pred) - elif self.spec.metric == "smape": - score = smape(y_test.values, y_pred.values) - else: - score = mean_absolute_percentage_error(y_test, y_pred) - - scores.append(score) - + scores.append(score_fn(y_test, y_pred)) return np.mean(scores) study = optuna.create_study(direction="minimize") - trials = 2 if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials + trials = DEFAULT_TRIALS if self.spec.tuning.n_trials is None else self.spec.tuning.n_trials study.optimize(objective, n_trials=trials) model_kwargs_i["deseasonalize_model"] = study.best_params["deseasonalize_model"] + model_kwargs_i["deseasonalize"] = study.best_params["deseasonalize"] return model_kwargs_i def _generate_report(self): @@ -278,7 +276,7 @@ def _generate_report(self): # ---- Extract details from ThetaModel ---- fitted_params = model.get_fitted_params() - alpha = fitted_params.get("initial_level", None) + initial_level = fitted_params.get("initial_level", None) smoothing_level = fitted_params.get("smoothing_level", None) sp = model.sp deseasonalize_model = model.deseasonalize_model @@ -296,8 +294,8 @@ def _generate_report(self): # ---- Build the DF ---- meta_df = pd.DataFrame({ "Metric": [ - "Alpha / Initial Level", - "Smoothing Level", + "Initial Level", + "Smoothing Level / Alpha", "No. Observations", "Deseasonalized", "Deseasonalization Method", @@ -306,7 +304,7 @@ def _generate_report(self): "Sample End", ], "Value": [ - alpha, + initial_level, smoothing_level, n_obs, str(desasonalized is not None), diff --git a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst index dc0ee92de..8f73a9e8b 100644 --- a/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst +++ b/docs/source/user_guide/operators/forecast_operator/yaml_schema.rst @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified: - string - No - prophet - - Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select. + - Model to use. Options: prophet, arima, neuralprophet, theta, automlx, autots, auto-select. * - model_kwargs - dict @@ -266,7 +266,7 @@ Further Description * **format**: (Optional) Specify the format for output data (e.g., ``csv``, ``json``, ``excel``). * **options**: (Optional) Include any additional arguments, such as connection parameters for storage. - * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``autots``, and ``auto-select``. + * **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``theta``, ``autots``, and ``auto-select``. * **model_kwargs**: (Optional) A dictionary of arguments to pass directly to the model framework, allowing for detailed control over modeling.