Source code for feature_selection.pycaret_selection

"""PyCaret-based feature selection (generic, case-driven).

Callers pass a ``FeatureAnalysisData`` (whose construction stays
case-side, so the case enforces its own ALLOWLIST upstream) together
with the post-selection ``allowlist`` to validate against.

V1 contract
-----------
- The DataFrame handed to PyCaret is built exclusively from a
  ``FeatureAnalysisData`` instance. PyCaret never reads Zarr directly,
  so the caller-supplied allowlist remains the single guard against
  target-adjacent columns leaking back into the selected feature set.

- ``setup()`` locks ``polynomial_features``, ``feature_interaction``,
  ``pca``, and ``group_features`` off. Those settings would synthesize
  columns that ``TabularPairDataset`` cannot reproduce from the Zarr
  stores, which silently breaks the handoff to PhysicsNeMo training via
  ``data.input_columns_file``.

- Case-level train/test split runs *before* ``setup()``. Row-level
  holdout inside PyCaret would place rows from the same case into both
  splits (rows inside a case are spatially correlated, so that leaks).
  The pre-split test frame is passed through via ``test_data``.

- Inside ``setup()``, ``fold_strategy='groupkfold'`` with
  ``fold_groups='case_id'`` keeps internal CV group-safe.
- Output ``selected_features.txt`` is one name per line, no header,
  drop-in for ``data.input_columns_file`` in the MLP training config.
"""

import csv
import json
import logging
from collections.abc import Collection
from pathlib import Path
from typing import Any

import numpy as np

from feature_selection.data import FeatureAnalysisData

logger = logging.getLogger(__name__)


# setup() arguments that are locked in v1. PyCaret-generated polynomial
# or PCA columns cannot be reproduced from Zarr by TabularPairDataset;
# group_features would collapse raw columns into a surrogate feature
# TabularPairDataset also cannot reproduce. (PyCaret 2.x's
# ``feature_interaction`` was folded into ``polynomial_features`` in 3.x.)
_V1_LOCKED_SETUP_ARGS: dict[str, Any] = {
    "polynomial_features": False,
    "pca": False,
    "group_features": None,
}

_CASE_ID_COL = "case_id"


# ---------------------------------------------------------------------------
# DataFrame bridge
# ---------------------------------------------------------------------------


[docs] def build_dataframe(data: FeatureAnalysisData): """Materialize ``FeatureAnalysisData`` as a pandas DataFrame. Columns: ``feature_names + [target_name, case_id]``. The ``case_id`` column is used for GroupKFold inside PyCaret and for the case-level holdout split, and is always dropped from the final selection artifact. """ import pandas as pd if _CASE_ID_COL in data.feature_names: raise ValueError( f"FeatureAnalysisData.feature_names contains reserved column {_CASE_ID_COL!r}." ) if data.target_name == _CASE_ID_COL: raise ValueError(f"target_name collides with reserved column {_CASE_ID_COL!r}.") row_case_id = np.empty(data.groups.shape[0], dtype=object) for case_idx, case_name in enumerate(data.case_ids): row_case_id[data.groups == case_idx] = case_name df = pd.DataFrame(data.X, columns=list(data.feature_names)) df[data.target_name] = np.asarray(data.y) df[_CASE_ID_COL] = row_case_id return df
[docs] def case_level_split(df, *, case_id_col: str, test_ratio: float, seed: int): """Split a row-level DataFrame into train/test without crossing cases.""" from sklearn.model_selection import GroupShuffleSplit if not 0.0 < test_ratio < 1.0: raise ValueError(f"test_ratio must be in (0, 1); got {test_ratio}.") gss = GroupShuffleSplit(n_splits=1, test_size=test_ratio, random_state=seed) ((train_idx, test_idx),) = gss.split(df, groups=df[case_id_col].values) return df.iloc[train_idx].copy(), df.iloc[test_idx].copy()
[docs] def enforce_allowlist(selected: list[str], allowlist: Collection[str]) -> None: """Raise if any selected feature is outside the caller's allowlist. Kept as a top-level function so the v1 contract is unit-testable without importing PyCaret. The caller supplies the allowlist; this keeps the library case-agnostic. """ allowed = set(allowlist) out_of_allowlist = [f for f in selected if f not in allowed] if out_of_allowlist: raise RuntimeError( f"PyCaret selected features outside allowlist: {out_of_allowlist}. " "v1 forbids synthesized columns; check _V1_LOCKED_SETUP_ARGS and " "setup() kwargs in your config." )
# --------------------------------------------------------------------------- # PyCaret wrappers (lazy-imported) # --------------------------------------------------------------------------- def _require_pycaret(): try: from pycaret import regression # noqa: F401 except Exception as exc: raise RuntimeError( "PyCaret is not available. Install the optional dependency: " "`pip install 'pycaret>=3.0'`." ) from exc def _extract_selected_features(exp, target_name: str) -> list[str]: """Read selected feature names from PyCaret's post-setup state. After ``setup(feature_selection=True, ...)`` the training feature matrix (``X_train_transformed``) contains only the columns PyCaret kept after low-variance filtering, multicollinearity removal, and feature selection. """ X_train = exp.get_config("X_train_transformed") cols = [str(c) for c in X_train.columns] # The target and case_id should not appear here, but enforce it defensively: # the selected_features.txt contract is strict. return [c for c in cols if c != target_name and c != _CASE_ID_COL] def _extract_ranking(exp, ranker_id: str) -> list[dict[str, Any]]: """Fit a single ranker and return sorted feature importances. The ranker identity is recorded in ``pycaret_setup.json`` so runs are reproducible. Uses ``feature_importances_`` for tree models and ``|coef_|`` for linear models. Falls back to an unranked list if the estimator exposes neither. """ # cross_validation=False fits the ranker once on the full train set # instead of running k-fold CV. We only consume feature_importances_ / # |coef_|, so CV-validated metrics are wasted compute (typically ~10x # slowdown on sklearn GBR with default fold=10). verbose=True surfaces # PyCaret's tqdm progress bar for the single fit. model = exp.create_model(ranker_id, cross_validation=False, verbose=True) X_train = exp.get_config("X_train_transformed") names = [str(c) for c in X_train.columns if c != _CASE_ID_COL] # PyCaret may return a Pipeline; unwrap the final estimator. est = model if hasattr(model, "steps"): est = model.steps[-1][1] importances: np.ndarray | None = None if hasattr(est, "feature_importances_"): importances = np.asarray(est.feature_importances_, dtype=float) elif hasattr(est, "coef_"): coef = np.asarray(est.coef_, dtype=float) if coef.ndim > 1: coef = coef[0] importances = np.abs(coef) if importances is None or len(importances) != len(names): return [{"feature": n, "importance": None, "rank": i + 1} for i, n in enumerate(names)] order = np.argsort(-importances) return [ { "feature": names[i], "importance": float(importances[i]), "rank": r + 1, } for r, i in enumerate(order) ] # --------------------------------------------------------------------------- # Orchestrator # ---------------------------------------------------------------------------
[docs] def run_pycaret_selection( data: FeatureAnalysisData, *, pycaret_cfg: dict[str, Any], output_dir: Path, allowlist: Collection[str], ) -> dict[str, Any]: """Run the v1 PyCaret selection path and write artifacts. ``allowlist`` is the set of permitted post-selection feature names; callers supply it (typically the case-side ALLOWLIST constant) so this library stays alpha-D-agnostic. """ _require_pycaret() from pycaret.regression import RegressionExperiment output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) seed = int(pycaret_cfg.get("seed", 42)) test_ratio = float(pycaret_cfg.get("test_ratio", 0.2)) ranker_id = str(pycaret_cfg.get("ranker", "gbr")) df = build_dataframe(data) train_df, test_df = case_level_split( df, case_id_col=_CASE_ID_COL, test_ratio=test_ratio, seed=seed, ) logger.info( "PyCaret split: train=%d rows / %d cases, test=%d rows / %d cases", len(train_df), train_df[_CASE_ID_COL].nunique(), len(test_df), test_df[_CASE_ID_COL].nunique(), ) user_setup: dict[str, Any] = dict(pycaret_cfg.get("setup") or {}) locked_overrides = [k for k in _V1_LOCKED_SETUP_ARGS if k in user_setup] if locked_overrides: raise ValueError( f"setup keys locked in v1: {locked_overrides}. Remove them from the " "config; see feature_selection.pycaret_selection._V1_LOCKED_SETUP_ARGS." ) # Defaults when unspecified by the user. user_setup.setdefault("normalize", True) user_setup.setdefault("remove_multicollinearity", True) user_setup.setdefault("feature_selection", True) setup_kwargs: dict[str, Any] = { "data": train_df, "target": data.target_name, "test_data": test_df, "fold_strategy": "groupkfold", "fold_groups": _CASE_ID_COL, "ignore_features": [_CASE_ID_COL], "session_id": seed, "html": False, "verbose": False, **user_setup, # Locked kwargs go last so they cannot be silently overridden. **_V1_LOCKED_SETUP_ARGS, } exp = RegressionExperiment() exp.setup(**setup_kwargs) selected = _extract_selected_features(exp, data.target_name) enforce_allowlist(selected, allowlist) ranking = _extract_ranking(exp, ranker_id) # Trim to n_features_to_select by ranker importance. PyCaret's # ``classic`` selection_method can leave extras above the requested # count; honour the cap so the artifact matches the user's request. top_k = user_setup.get("n_features_to_select") if top_k is not None and len(selected) > int(top_k): ranked_names = [r["feature"] for r in ranking if r["feature"] in selected] selected = ranked_names[: int(top_k)] # --- Artifacts ----------------------------------------------------- write_selected_features( output_dir / "selected_features.txt", selected, allowlist=allowlist, ) with (output_dir / "feature_ranking.csv").open("w", newline="") as fh: writer = csv.DictWriter(fh, fieldnames=["rank", "feature", "importance"]) writer.writeheader() for row in ranking: writer.writerow( { "rank": row["rank"], "feature": row["feature"], "importance": ("" if row["importance"] is None else f"{row['importance']:.8e}"), } ) setup_record = { "user_setup": user_setup, "locked_setup": _V1_LOCKED_SETUP_ARGS, "fold_strategy": "groupkfold", "fold_groups": _CASE_ID_COL, "ignore_features": [_CASE_ID_COL], "ranker": ranker_id, "seed": seed, "test_ratio": test_ratio, "ranking_source": (f"create_model('{ranker_id}') feature_importances_ / |coef_|"), } (output_dir / "pycaret_setup.json").write_text( json.dumps(setup_record, indent=2, sort_keys=True) ) return { "selected": selected, "ranking": ranking, "n_train_rows": int(len(train_df)), "n_test_rows": int(len(test_df)), "n_train_cases": int(train_df[_CASE_ID_COL].nunique()), "n_test_cases": int(test_df[_CASE_ID_COL].nunique()), }
[docs] def write_selected_features( path: Path, selected: list[str], *, allowlist: Collection[str] | None = None, ) -> None: """Write ``selected_features.txt``. One name per line, no header, no blank lines, trailing newline. Drop-in for ``data.input_columns_file`` in the training config. When ``allowlist`` is supplied (typically the case-side ALLOWLIST), the file is rejected if any name falls outside it. """ if any(not s or s != s.strip() for s in selected): raise ValueError("selected_features contains empty or whitespace-padded names.") if allowlist is not None: enforce_allowlist(selected, allowlist) Path(path).write_text("\n".join(selected) + "\n")