Source code for cases.alpha_d.etl.transform

"""AlphaDTransformation: extract contraction ROI, compute Darcy coefficient.

Implements the DataTransformation ABC from physicsnemo-curator.

For each CFD case the transformation:
  1. Computes element centroids from node coords + connectivity.
  2. Identifies the contraction region from geometry parameters.
  3. Bins elements into axial (z) stations.
  4. Computes cross-section-averaged pressure at each station.
  5. Derives the Darcy resistance coefficient alpha_D via the pressure
     gradient and local hydraulic diameter.
  6. Constructs the feature table that the MLP will consume.

Output dict contains ``features`` [N_stations, D_in], ``targets``
[N_stations, 1], and metadata.
"""

import logging
import math
from typing import Any, Optional

import numpy as np
from physicsnemo_curator.etl.data_transformations import DataTransformation
from physicsnemo_curator.etl.processing_config import ProcessingConfig

from cases.alpha_d.physics.targets import encode_alpha_d_target

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Geometry helpers
# ---------------------------------------------------------------------------


def _local_diameter(
    z: np.ndarray, z_throat_start: float, z_throat_end: float, D_big: float, D_small: float
) -> np.ndarray:
    """Compute local pipe diameter along the z-axis.

    Upstream/downstream of the contraction the diameter is D_big.
    In the contraction region between z_throat_start and z_throat_end,
    the diameter is D_small (abrupt contraction model).
    """
    d = np.full_like(z, D_big, dtype=np.float64)
    mask = (z >= z_throat_start) & (z <= z_throat_end)
    d[mask] = D_small
    return d


def _region_flags(
    z_hat: np.ndarray, z_norm_throat_start: float, z_norm_throat_end: float
) -> dict[str, np.ndarray]:
    """Compute region flags from normalized z coordinate.

    Returns ``is_upstream``, ``is_throat``, ``is_downstream`` as float32
    one-hot arrays.  The ``is_contraction`` and ``is_expansion`` flags are
    omitted because the abrupt geometry makes them always zero.
    """
    upstream = (z_hat < z_norm_throat_start).astype(np.float32)
    downstream = (z_hat > z_norm_throat_end).astype(np.float32)
    throat = ((z_hat >= z_norm_throat_start) & (z_hat <= z_norm_throat_end)).astype(np.float32)
    return {
        "is_upstream": upstream,
        "is_throat": throat,
        "is_downstream": downstream,
    }


def _sample_weights(region_flags: dict[str, np.ndarray]) -> np.ndarray:
    """Assign per-station sample weights for future weighted loss."""
    w = np.ones(len(region_flags["is_upstream"]), dtype=np.float32)
    w[region_flags["is_throat"] > 0.5] = 5.0
    return w


[docs] class AlphaDTransformation(DataTransformation): """Extract contraction-region Darcy resistance profiles. Args: cfg : ProcessingConfig. n_stations : Number of axial stations to bin into. buffer_diams : Number of pipe diameters of upstream/downstream buffer. rho : Fluid density (kg/m^3), matching the CFD setup. min_elements : Minimum elements required per station; skip case if not met. """ def __init__( self, cfg: ProcessingConfig, n_stations: int = 50, buffer_diams: float = 1.0, rho: float = 1.0, min_elements: int = 3, ): super().__init__(cfg) self.n_stations = n_stations self.buffer_diams = buffer_diams self.rho = rho self.min_elements = min_elements
[docs] def transform(self, data: dict[str, Any]) -> Optional[dict[str, Any]]: """Compute alpha_D axial profile for one CFD case.""" case_name: str = data["case_name"] case_meta: dict = data["case_meta"] coords: np.ndarray = data["coords"] # [N, 3] in meters connectivity: np.ndarray = data["connectivity"] field_names: list[str] = data["field_names"] fields: np.ndarray = data["fields"] # [T, E, F] # --- Extract case parameters --- Re = float(case_meta.get("Re", 0)) Dr = float(case_meta.get("diameter_ratio", 0)) Lr = float(case_meta.get("length_ratio", 0)) pipe_radius_m = float(case_meta.get("pipe_radius_m", 0.1)) D_big = 2.0 * pipe_radius_m D_contraction_m = float(case_meta.get("D_contraction_m", D_big * Dr)) if Re <= 0 or Dr <= 0 or Lr <= 0: logger.warning( "Skipping %s: invalid case parameters (Re=%s Dr=%s Lr=%s)", case_name, Re, Dr, Lr ) return None # --- Geometry: locate the contraction region --- # Geometry is handled in meter units after source mesh scaling. # Lower pipe center at z=0, inner cylinder center at # z = outer_height/2 + inner_height/2 outer_height_m = 1.0 inner_height_m = outer_height_m * Lr z_throat_start = outer_height_m / 2.0 # 0.5 m z_throat_end = z_throat_start + inner_height_m # ROI: throat +/- buffer buffer = self.buffer_diams * D_big z_roi_start = z_throat_start - buffer z_roi_end = z_throat_end + buffer # --- Compute element centroids --- # coords: [N, 3], connectivity: [E, K] elem_centroids = coords[connectivity].mean(axis=1) # [E, 3] elem_z = elem_centroids[:, 2] # z-component # --- Identify pressure field --- if "pressure" not in field_names: logger.warning("Skipping %s: no 'pressure' field found.", case_name) return None p_idx = field_names.index("pressure") # --- Identify axial velocity field (vel_z is the streamwise component) --- if "vel_z" not in field_names: logger.warning("Skipping %s: no 'vel_z' field found.", case_name) return None vz_idx = field_names.index("vel_z") # Use last time step (converged steady state) pressure = fields[-1, :, p_idx] # [E] vel_z = fields[-1, :, vz_idx] # [E] # --- Select ROI elements --- roi_mask = (elem_z >= z_roi_start) & (elem_z <= z_roi_end) n_roi = roi_mask.sum() if n_roi < self.n_stations * self.min_elements: logger.warning( "Skipping %s: only %d ROI elements (need %d).", case_name, n_roi, self.n_stations * self.min_elements, ) return None # --- Bin into axial stations --- station_edges = np.linspace(z_roi_start, z_roi_end, self.n_stations + 1) station_z = 0.5 * (station_edges[:-1] + station_edges[1:]) # Compute element cross-sectional areas (projected onto z-plane) # For hex/tet elements, approximate as the area of the polygon # formed by node x,y coordinates projected onto the z-plane. # Simplified: use radial position to weight by annular area for # axisymmetric meshes. elem_r = np.sqrt(elem_centroids[:, 0] ** 2 + elem_centroids[:, 1] ** 2) p_avg = np.zeros(self.n_stations, dtype=np.float64) vz_avg = np.zeros(self.n_stations, dtype=np.float64) counts = np.zeros(self.n_stations, dtype=np.int32) for s in range(self.n_stations): mask = roi_mask & (elem_z >= station_edges[s]) & (elem_z < station_edges[s + 1]) if mask.sum() == 0: continue # Area-weighted average: weight by r for axisymmetric r_weights = elem_r[mask] r_weights = np.maximum(r_weights, 1e-12) # avoid zero p_avg[s] = np.average(pressure[mask], weights=r_weights) vz_avg[s] = np.average(vel_z[mask], weights=r_weights) counts[s] = mask.sum() # Handle last bin edge if counts[-1] == 0 and counts[-2] > 0: p_avg[-1] = p_avg[-2] vz_avg[-1] = vz_avg[-2] counts[-1] = 1 # Skip if too many empty stations valid = counts > 0 if valid.sum() < 5: logger.warning("Skipping %s: too few valid stations (%d).", case_name, valid.sum()) return None # Interpolate missing stations if not valid.all(): p_avg[~valid] = np.interp(station_z[~valid], station_z[valid], p_avg[valid]) vz_avg[~valid] = np.interp(station_z[~valid], station_z[valid], vz_avg[valid]) # --- Compute pressure gradient dP/dz --- dz = station_z[1] - station_z[0] dp_dz = np.gradient(p_avg, dz) # --- Compute Darcy resistance coefficient --- # alpha_D = -dP/dz / (0.5 * rho * V_bulk^2 / D_h) # V_bulk = inlet_u = 1.0 m/s (from simulation.i) V_bulk = 1.0 D_local = _local_diameter(station_z, z_throat_start, z_throat_end, D_big, D_contraction_m) D_h = D_local # hydraulic diameter = pipe diameter for circular cross-section # Darcy friction formulation: -dP/dz = alpha_D * (rho * V^2) / (2 * D_h) # => alpha_D = -dP/dz * 2 * D_h / (rho * V^2) alpha_D = -dp_dz * 2.0 * D_h / (self.rho * V_bulk**2) # --- Total pressure drop across ROI --- delta_p_case = float(p_avg[0] - p_avg[-1]) # --- Build feature vectors for ALL stations --- z_hat = (station_z - z_roi_start) / (z_roi_end - z_roi_start) d_local_over_D = D_local / D_big A_local_over_A = (D_local / D_big) ** 2 # area ratio # Area-mean axial velocity normalized by inlet bulk velocity. In ideal # incompressible flow this equals 1 / A_local_over_A; here we expose # the simulation-derived value so the model gets the as-computed # quantity rather than relying on the round-pipe identity. V_local_over_V_bulk = vz_avg / V_bulk z_throat_start_norm = (z_throat_start - z_roi_start) / (z_roi_end - z_roi_start) z_throat_end_norm = (z_throat_end - z_roi_start) / (z_roi_end - z_roi_start) regions = _region_flags(z_hat, z_throat_start_norm, z_throat_end_norm) # Neighbour / structural features. Pointwise MLP can't see the # contraction shape from local features alone; these expose # geometry context so it doesn't have to. # dD_dz_local: spike at the contraction edges (np.gradient of a # piecewise-constant signal is non-zero only at the jumps). # dist_to_throat_*: signed distance to throat entry / exit in # z_hat units. Negative upstream of the entry, 0 at the # boundary, positive on the other side; the model can use # these to localise its prediction relative to the # separation / reattachment regions. dD_dz_local = np.gradient(d_local_over_D, z_hat) dist_to_throat_start = z_hat - z_throat_start_norm dist_to_throat_end = z_throat_end_norm - z_hat # --- Encode targets --- # Keep the historical positive-only log target for backward # compatibility, and add a signed target that preserves favorable # pressure-gradient regions for physics-consistent training. alpha_D_floor = 1e-3 log_alpha_D = encode_alpha_d_target( np.maximum(alpha_D, alpha_D_floor), target_name="log_alpha_D", ).astype(np.float32) signed_log1p_alpha_D = encode_alpha_d_target( alpha_D, target_name="signed_log1p_alpha_D", ).astype(np.float32) weights = _sample_weights(regions) feature_names = [ "log10_Re", "Dr", "Lr", "z_hat", "d_local_over_D", "A_local_over_A", "V_local_over_V_bulk", "is_upstream", "is_throat", "is_downstream", "dD_dz_local", "dist_to_throat_start", "dist_to_throat_end", ] target_names = ["log_alpha_D", "signed_log1p_alpha_D"] n_out = self.n_stations features = np.column_stack( [ np.full(n_out, math.log10(Re), dtype=np.float32), np.full(n_out, Dr, dtype=np.float32), np.full(n_out, Lr, dtype=np.float32), z_hat.astype(np.float32), d_local_over_D.astype(np.float32), A_local_over_A.astype(np.float32), V_local_over_V_bulk.astype(np.float32), regions["is_upstream"], regions["is_throat"], regions["is_downstream"], dD_dz_local.astype(np.float32), dist_to_throat_start.astype(np.float32), dist_to_throat_end.astype(np.float32), ] ) # [n_out, 13] targets = np.column_stack([log_alpha_D, signed_log1p_alpha_D]).astype(np.float32) self.logger.info( " %s: %d stations, delta_p=%.4f Pa, " "log_alpha_D range=[%.2f, %.2f], " "signed_log1p_alpha_D range=[%.2f, %.2f]", case_name, n_out, delta_p_case, float(log_alpha_D.min()), float(log_alpha_D.max()), float(signed_log1p_alpha_D.min()), float(signed_log1p_alpha_D.max()), ) return { "case_name": case_name, "features": features, "targets": targets, "feature_names": feature_names, "target_names": target_names, "sample_weight": weights, "delta_p_case": delta_p_case, "Re": Re, "Dr": Dr, "Lr": Lr, # Geometry constants needed to reconstruct the ROI in eval. "D_big": D_big, "outer_height_m": outer_height_m, "buffer_diams": float(self.buffer_diams), "rho": float(self.rho), "V_bulk": V_bulk, }