Source code for cases.alpha_d.etl.source

"""AlphaDSource: reads MOOSE simulation_out.e + case_metadata.txt.

Subclasses physicsnemo_curator DataSource.  Each file represents one
parametric-study case.  The source returns raw mesh + field arrays together
with the case parameters (Re, Dr, Lr, geometry dimensions) needed by the
downstream transformation.
"""

import csv
import logging
import sys
from pathlib import Path
from typing import Any

import numpy as np

_SRC_DIR = Path(__file__).resolve().parents[1]
if str(_SRC_DIR) not in sys.path:
    sys.path.insert(0, str(_SRC_DIR))

from physicsnemo_curator.etl.data_sources import DataSource
from physicsnemo_curator.etl.processing_config import ProcessingConfig

from read_exdous import ExodusReader

logger = logging.getLogger(__name__)


def _parse_case_metadata(meta_path: Path) -> dict[str, Any]:
    """Parse ``case_metadata.txt`` into a dict of typed values."""
    meta: dict[str, Any] = {}
    for line in meta_path.read_text().splitlines():
        line = line.strip()
        if not line or "=" not in line:
            continue
        key, _, val = line.partition("=")
        key = key.strip()
        val = val.strip()
        try:
            meta[key] = float(val)
        except ValueError:
            meta[key] = val
    return meta


def _load_manifest(manifest_path: Path) -> dict[str, dict[str, Any]]:
    """Load ``cases_manifest.csv`` keyed by case_name."""
    manifest: dict[str, dict[str, Any]] = {}
    with manifest_path.open() as f:
        reader = csv.DictReader(f)
        for row in reader:
            name = row["case_name"]
            typed: dict[str, Any] = {}
            for k, v in row.items():
                try:
                    typed[k] = float(v)
                except (ValueError, TypeError):
                    typed[k] = v
            manifest[name] = typed
    return manifest


[docs] class AlphaDSource(DataSource): """Reads ``simulation_out.e`` files from the parametric study. Args: cfg : ProcessingConfig. input_dir : Root of parametric study (contains case sub-directories). manifest : Path to ``cases_manifest.csv``. mesh_scale: Scale factor applied to mesh coordinates (default 1.0). exodus_filename: Name of the output Exodus file inside each case dir. """ def __init__( self, cfg: ProcessingConfig, input_dir: str, manifest: str | None = None, mesh_scale: float = 1.0, exodus_filename: str = "simulation_out.e", ): super().__init__(cfg) self.input_dir = Path(input_dir) self.mesh_scale = mesh_scale self.exodus_filename = exodus_filename self._reader = ExodusReader(use_rich=False) if manifest: self._manifest = _load_manifest(Path(manifest)) else: manifest_path = self.input_dir / "cases_manifest.csv" if manifest_path.exists(): self._manifest = _load_manifest(manifest_path) else: self._manifest = {}
[docs] def get_file_list(self) -> list[str]: """Return paths to simulation_out.e files in case sub-directories.""" files: list[str] = [] for case_dir in sorted(self.input_dir.iterdir()): if not case_dir.is_dir(): continue exodus_path = case_dir / self.exodus_filename if exodus_path.exists(): files.append(str(exodus_path)) if not files: logger.warning("No '%s' files found under %s", self.exodus_filename, self.input_dir) return files
[docs] def read_file(self, filename: str) -> dict[str, Any]: """Read one case's Exodus output and metadata.""" from netCDF4 import Dataset as NC4Dataset path = Path(filename) case_dir = path.parent case_name = case_dir.name self.logger.info("Reading case: %s", case_name) # --- Case parameters --- meta_path = case_dir / "case_metadata.txt" if meta_path.exists(): case_meta = _parse_case_metadata(meta_path) elif case_name in self._manifest: case_meta = dict(self._manifest[case_name]) else: raise FileNotFoundError( f"No case_metadata.txt or manifest entry for case '{case_name}'." ) # --- Exodus data --- ds = NC4Dataset(str(path), "r") try: coords, connectivity, field_names, fields, time_steps = self._extract(ds) finally: ds.close() # Apply mesh scaling to coordinates coords = coords * self.mesh_scale self.logger.info( " nodes=%d elements=%d fields=%s time_steps=%d", coords.shape[0], connectivity.shape[0], field_names, len(time_steps), ) return { "case_name": case_name, "case_meta": case_meta, "coords": coords, "connectivity": connectivity, "field_names": field_names, "fields": fields, "time_steps": time_steps, }
def _extract(self, ds): """Extract geometry and fields from an open netCDF4 Dataset.""" coordx = np.array(ds.variables["coordx"][:], dtype=np.float64) coordy = np.array(ds.variables["coordy"][:], dtype=np.float64) coordz = np.array(ds.variables["coordz"][:], dtype=np.float64) coords = np.stack([coordx, coordy, coordz], axis=1) # Element connectivity (multi-block support) conn_blocks = [] for var_name in sorted(ds.variables): if var_name.startswith("connect") and not var_name.endswith("names"): arr = np.array(ds.variables[var_name][:], dtype=np.int32) - 1 conn_blocks.append(arr) if conn_blocks: connectivity = np.concatenate(conn_blocks, axis=0) else: connectivity = np.empty((0, 4), dtype=np.int32) time_steps = np.array(ds.variables["time_whole"][:], dtype=np.float64) names_by_kind = self._reader.build_name_lookup(ds) elem_field_names: list[str] = names_by_kind.get("element", []) num_time = len(time_steps) num_elem = connectivity.shape[0] num_fields = len(elem_field_names) fields = np.zeros((num_time, num_elem, num_fields), dtype=np.float64) for fi, _ in enumerate(elem_field_names): # Try all element blocks and concatenate block_arrays: list[np.ndarray] = [] block_id = 1 while True: var_name = f"vals_elem_var{fi + 1}eb{block_id}" if var_name not in ds.variables: break block_arrays.append(np.array(ds.variables[var_name][:], dtype=np.float64)) block_id += 1 if block_arrays: fields[:, :, fi] = np.concatenate(block_arrays, axis=1) else: var_name = f"vals_elem_var{fi + 1}" if var_name in ds.variables: fields[:, :, fi] = np.array(ds.variables[var_name][:], dtype=np.float64) return coords, connectivity, elem_field_names, fields, time_steps # --- Write stubs (read-only source) --- def _get_output_path(self, filename: str) -> Path: raise NotImplementedError("AlphaDSource is read-only.") def _write_impl_temp_file(self, data: dict[str, Any], output_path: Path) -> None: raise NotImplementedError("AlphaDSource is read-only.")