"""Data loader for feature analysis.
Flattens the per-case zarr stores produced by the alpha_D ETL pipeline
into dense numpy arrays ``(X, y, groups)`` suitable for sklearn.
Leakage controls
----------------
1. ``ALLOWLIST`` hard-codes the input features that are safe to consider.
The YAML config may *restrict* this set via ``selected_from_allowlist``
but cannot extend it. Unknown names raise ``ValueError``. Extending the
allowlist is a code change so it appears in review.
2. ``groups`` is the per-row case index. Callers must use
``sklearn.model_selection.GroupKFold(groups=groups)`` for all CV;
rows inside a case are spatially correlated.
3. Metadata attributes such as ``delta_p_case`` are *never* included as
features here — they are target-adjacent and would leak.
"""
import json
from pathlib import Path
import numpy as np
import zarr
from cases.alpha_d.physics.targets import (
convert_alpha_d_values_between_bases,
is_alpha_d_target,
)
from feature_selection.data import FeatureAnalysisData
BASE_ALLOWLIST: tuple[str, ...] = (
"log10_Re",
"Dr",
"Lr",
"z_hat",
"d_local_over_D",
"A_local_over_A",
"V_local_over_V_bulk",
"dD_dz_local",
)
ENGINEERED_FEATURES: tuple[str, ...] = (
"log10_Re_throat",
"log10_Re_local",
"inv_Dr",
"Dr_times_Lr",
"z_hat_times_Dr",
"z_hat_times_Lr",
"dist_to_throat_start",
"dist_to_throat_end",
"dist_to_nearest_step",
)
# Full candidate set accepted by `selected_from_allowlist`.
ALLOWLIST: tuple[str, ...] = BASE_ALLOWLIST + ENGINEERED_FEATURES
GROUPED_FEATURES: dict[str, tuple[str, ...]] = {
"region_onehot": ("is_upstream", "is_throat", "is_downstream"),
}
def _parse_Dr(case_name: str) -> float:
for part in case_name.split("__"):
if part.startswith("Dr_"):
return float(part[3:].replace("p", "."))
return 0.0
def _resolve_features(selected: list[str] | None) -> list[str]:
if selected is None:
# ``selected_from_allowlist: null`` in the config means "evaluate
# the full advertised candidate set" — base features + the
# engineered names below. The PyCaret selector then prunes down to
# ``n_features_to_select``. Restrict to ``BASE_ALLOWLIST`` only by
# listing it explicitly in the config.
return list(ALLOWLIST)
allowed = set(ALLOWLIST)
unknown = [c for c in selected if c not in allowed]
if unknown:
raise ValueError(
f"selected_from_allowlist contains names outside ALLOWLIST: {unknown}. "
f"To add a feature, edit ALLOWLIST in cases/alpha_d/feature_data.py."
)
seen: set[str] = set()
ordered: list[str] = []
for name in ALLOWLIST:
if name in selected and name not in seen:
ordered.append(name)
seen.add(name)
return ordered
[docs]
def build_engineered_feature_map(
features: np.ndarray,
raw_feature_names: list[str],
) -> dict[str, np.ndarray]:
"""Build leakage-safe engineered features from raw per-row feature columns."""
feat_map = {n: i for i, n in enumerate(raw_feature_names)}
log10_Re = features[:, feat_map["log10_Re"]].astype(np.float64)
Dr = np.maximum(features[:, feat_map["Dr"]].astype(np.float64), 1e-12)
Lr = np.maximum(features[:, feat_map["Lr"]].astype(np.float64), 1e-12)
z_hat = features[:, feat_map["z_hat"]].astype(np.float64)
d_local_over_D = np.maximum(
features[:, feat_map["d_local_over_D"]].astype(np.float64),
1e-12,
)
V_local_over_V_bulk = np.maximum(
np.abs(features[:, feat_map["V_local_over_V_bulk"]].astype(np.float64)),
1e-12,
)
# Locate throat interfaces from one-hot region flags directly.
throat_mask = features[:, feat_map["is_throat"]].astype(np.float64) > 0.5
if np.any(throat_mask):
z_throat_start = float(np.min(z_hat[throat_mask]))
z_throat_end = float(np.max(z_hat[throat_mask]))
else:
# Fallback: if mask is absent/corrupted, put interfaces outside range.
z_throat_start = 2.0
z_throat_end = 2.0
# Sign convention matches the ETL: both signed distances are positive
# toward the throat centre (entry: past start; exit: upstream of end).
dist_to_throat_start = z_hat - z_throat_start
dist_to_throat_end = z_throat_end - z_hat
dist_to_nearest_step = np.minimum(
np.abs(dist_to_throat_start),
np.abs(dist_to_throat_end),
)
# Re_local = V_local * D_h_local / nu_molecular, with D_h_local = d_local
# for circular cross-section and nu_molecular = D / Re_global per case
# (V_bulk = rho = 1 in the simulation). This evaluates to
# Re_global * (V_local / V_bulk) * (d_local / D)
# so log10 Re_local = log10_Re + log10(V_local/V_bulk) + log10(d_local/D).
return {
"log10_Re_throat": (log10_Re - np.log10(Dr)).astype(np.float32),
"log10_Re_local": (
log10_Re + np.log10(V_local_over_V_bulk) + np.log10(d_local_over_D)
).astype(np.float32),
"inv_Dr": (1.0 / Dr).astype(np.float32),
"Dr_times_Lr": (Dr * Lr).astype(np.float32),
"z_hat_times_Dr": (z_hat * Dr).astype(np.float32),
"z_hat_times_Lr": (z_hat * Lr).astype(np.float32),
"dist_to_throat_start": dist_to_throat_start.astype(np.float32),
"dist_to_throat_end": dist_to_throat_end.astype(np.float32),
"dist_to_nearest_step": dist_to_nearest_step.astype(np.float32),
}
[docs]
def engineered_features_spec():
"""Entrypoint returning ``(names, builder)`` for TabularPairDataset.
Used by the pointwise adapter via the
``data.engineered_features_entrypoint`` config key.
"""
return list(ENGINEERED_FEATURES), build_engineered_feature_map
[docs]
def load_feature_matrix(
zarr_dir: str | Path,
*,
target: str = "log_alpha_D",
selected_from_allowlist: list[str] | None = None,
local_velocity_normalization: bool = True,
min_Dr: float | None = None,
exclude_cases: list[str] | None = None,
) -> FeatureAnalysisData:
"""Load all cases under ``zarr_dir`` into a flat ``FeatureAnalysisData``.
Parameters
----------
zarr_dir
Directory containing ``*.zarr`` stores.
target
Target column name. Must exist in zarr ``target_names``.
selected_from_allowlist
Restrict the ALLOWLIST to this subset. Cannot add new names.
local_velocity_normalization
If True, rescale alpha_D-family targets to the local-velocity
basis before returning them. Matches training behaviour when the
MLP is trained with ``local_velocity_normalization: true``.
min_Dr
Drop cases with diameter ratio below this value (matches training).
exclude_cases
Drop cases whose stem is in this list.
"""
zarr_dir = Path(zarr_dir)
sim_paths = sorted(zarr_dir.glob("*.zarr"))
if not sim_paths:
raise FileNotFoundError(f"No .zarr stores found in {zarr_dir}")
if exclude_cases:
exclude_set = set(exclude_cases)
sim_paths = [sp for sp in sim_paths if sp.stem not in exclude_set]
if min_Dr is not None:
sim_paths = [sp for sp in sim_paths if _parse_Dr(sp.stem) >= min_Dr]
if not sim_paths:
raise ValueError("All cases filtered out; check min_Dr / exclude_cases.")
feat_cols = _resolve_features(selected_from_allowlist)
x_chunks: list[np.ndarray] = []
y_chunks: list[np.ndarray] = []
case_ids: list[str] = []
rows_per_case: list[int] = []
raw_feature_names: list[str] | None = None
raw_target_names: list[str] | None = None
tgt_col: int | None = None
d_over_D_col_raw: int | None = None
apply_local_velocity_normalization = False
for sp in sim_paths:
root = zarr.open(store=str(sp), mode="r")
features = np.array(root["features"][:], dtype=np.float32)
targets = np.array(root["targets"][:], dtype=np.float32)
meta = root["metadata"]
if raw_feature_names is None:
raw_feature_names = list(json.loads(meta.attrs.get("feature_names", "[]")))
raw_target_names = list(json.loads(meta.attrs.get("target_names", "[]")))
raw_set = set(raw_feature_names)
eng_set = set(ENGINEERED_FEATURES)
missing = [c for c in feat_cols if c not in raw_set and c not in eng_set]
if missing:
raise ValueError(
"selected_from_allowlist entries not available from raw + engineered "
f"features: {missing}"
)
if target not in raw_target_names:
raise ValueError(f"target={target!r} not in zarr target_names={raw_target_names}")
tgt_col = raw_target_names.index(target)
if local_velocity_normalization and is_alpha_d_target(target):
if "d_local_over_D" not in raw_feature_names:
raise ValueError(
"local_velocity_normalization requires 'd_local_over_D' "
"in zarr feature_names."
)
d_over_D_col_raw = raw_feature_names.index("d_local_over_D")
apply_local_velocity_normalization = True
engineered = build_engineered_feature_map(features, raw_feature_names)
row_cols: list[np.ndarray] = []
for name in feat_cols:
if name in raw_feature_names:
row_cols.append(features[:, raw_feature_names.index(name)].astype(np.float32))
else:
row_cols.append(engineered[name])
x_chunks.append(np.column_stack(row_cols).astype(np.float32))
y_case = targets[:, tgt_col].astype(np.float64)
if d_over_D_col_raw is not None:
d_over_D = features[:, d_over_D_col_raw].astype(np.float64)
y_case = convert_alpha_d_values_between_bases(
y_case,
target_name=target,
d_over_D=d_over_D,
from_local_velocity_normalization=False,
to_local_velocity_normalization=True,
)
y_chunks.append(y_case.astype(np.float32))
case_id = str(meta.attrs.get("case_id", sp.stem))
case_ids.append(case_id)
rows_per_case.append(features.shape[0])
X = np.concatenate(x_chunks, axis=0)
y = np.concatenate(y_chunks, axis=0)
groups = np.concatenate([np.full(n, i, dtype=np.int32) for i, n in enumerate(rows_per_case)])
return FeatureAnalysisData(
X=X,
y=y,
groups=groups,
feature_names=feat_cols,
target_name=target,
case_ids=case_ids,
rows_per_case=rows_per_case,
local_velocity_normalization=apply_local_velocity_normalization,
)