Source code for cases.moose_grid.etl.data_sources.exodus_source

"""ExodusDataSource: reads MOOSE Exodus (.e) simulation output files.

Subclasses physicsnemo_curator's DataSource ABC.

Each Exodus file represents one simulation run.  The reader:
  1. Extracts mesh geometry (node coordinates, element connectivity).
  2. Extracts element solution fields for every time step.
  3. Optionally co-reads matching CSV line-probe files via CSVProbeSource.

The returned dict is keyed so that MOOSEDataTransformation can consume it
directly without field-name guessing.
"""

import logging
import sys
from pathlib import Path
from typing import Any

import numpy as np

# ---------------------------------------------------------------------------
# Ensure the src/ directory is on the path so we can import read_exdous helpers
# ---------------------------------------------------------------------------
_SRC_DIR = Path(__file__).resolve().parents[2]  # src/
if str(_SRC_DIR) not in sys.path:
    sys.path.insert(0, str(_SRC_DIR))

# Reuse decoding helpers from read_exdous.py
from physicsnemo_curator.etl.data_sources import DataSource
from physicsnemo_curator.etl.processing_config import ProcessingConfig

from cases.moose_grid.etl.data_sources.csv_source import CSVProbeSource
from cases.moose_grid.etl.schemas import MOOSERawData
from read_exdous import ExodusReader

logger = logging.getLogger(__name__)


[docs] class ExodusDataSource(DataSource): """Reads MOOSE Exodus files and co-reads matching CSV probe files. Args: cfg : ProcessingConfig from the curator framework. input_dir : Directory containing Exodus (.e) files. data_dir : Directory containing CSV probe files. If omitted, defaults to input_dir. """ def __init__( self, cfg: ProcessingConfig, input_dir: str, data_dir: str | None = None, ): super().__init__(cfg) self.input_dir = Path(input_dir) self.data_dir = Path(data_dir) if data_dir else self.input_dir self._csv_source = CSVProbeSource(self.data_dir) self._exodus_reader = ExodusReader(use_rich=False) # ------------------------------------------------------------------ # DataSource interface # ------------------------------------------------------------------
[docs] def get_file_list(self) -> list[str]: """Return sorted list of Exodus file paths.""" files = sorted(self.input_dir.glob("**/*.e")) if not files: logger.warning("No Exodus (.e) files found in %s", self.input_dir) return [str(f) for f in files]
[docs] def read_file(self, filename: str) -> dict[str, Any]: """Read one Exodus file and its associated CSV probes. Returns a dict that can be passed directly to MOOSEDataTransformation. """ from netCDF4 import Dataset as NC4Dataset # local import — not always installed path = Path(filename) sim_name = path.stem # e.g. 'lid-driven-segregated_out' self.logger.info("Reading Exodus file: %s", path.name) ds = NC4Dataset(str(path), "r") try: raw = self._extract_exodus(ds, sim_name) finally: ds.close() # Co-read CSV probes probe_data, probe_columns = self._csv_source.read_all(sim_name) raw.probe_data = probe_data raw.probe_columns = probe_columns self.logger.info( " nodes=%d elements=%d time_steps=%d probes=%d", raw.coords.shape[0], raw.connectivity.shape[0], len(raw.time_steps), len(raw.probe_data), ) # Return as plain dict for the curator pipeline return { "coords": raw.coords, "connectivity": raw.connectivity, "field_names": raw.field_names, "fields": raw.fields, "time_steps": raw.time_steps, "probe_data": raw.probe_data, "probe_columns": raw.probe_columns, "sim_name": raw.sim_name, }
# ------------------------------------------------------------------ # Sink stubs — this class is read-only # ------------------------------------------------------------------ def _get_output_path(self, filename: str) -> Path: raise NotImplementedError("ExodusDataSource is read-only; use MOOSEZarrSink for writing.") def _write_impl_temp_file(self, data: dict[str, Any], output_path: Path) -> None: raise NotImplementedError("ExodusDataSource is read-only; use MOOSEZarrSink for writing.") # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _extract_exodus(self, ds, sim_name: str) -> MOOSERawData: """Extract geometry, fields, and time from an open netCDF4 Dataset.""" # --- Coordinates --- coordx = np.array(ds.variables["coordx"][:], dtype=np.float32) coordy = np.array(ds.variables["coordy"][:], dtype=np.float32) if "coordz" in ds.variables: coordz = np.array(ds.variables["coordz"][:], dtype=np.float32) coords = np.stack([coordx, coordy, coordz], axis=1) else: coords = np.stack([coordx, coordy], axis=1) # --- Connectivity (0-indexed) --- # Exodus stores 1-indexed connectivity; subtract 1 for 0-indexed. connectivity_1 = np.array(ds.variables["connect1"][:], dtype=np.int32) connectivity = connectivity_1 - 1 # [E, K] # --- Time steps --- time_steps = np.array(ds.variables["time_whole"][:], dtype=np.float32) # --- Element variable names --- names_by_kind = self._exodus_reader.build_name_lookup(ds) elem_field_names: list[str] = names_by_kind.get("element", []) # --- Element variable arrays --- # Exodus names element vars: vals_elem_var{i}eb{block} # We collect all blocks but typically MOOSE writes one block (eb1). num_time = len(time_steps) num_elem = connectivity.shape[0] num_fields = len(elem_field_names) if num_fields == 0: logger.warning("No element variables found in %s", sim_name) fields = np.empty((num_time, num_elem, 0), dtype=np.float32) else: fields = np.zeros((num_time, num_elem, num_fields), dtype=np.float32) for fi, _ in enumerate(elem_field_names): # Try block 1 first, then unqualified variable name var_name = f"vals_elem_var{fi + 1}eb1" if var_name not in ds.variables: var_name = f"vals_elem_var{fi + 1}" if var_name in ds.variables: fields[:, :, fi] = np.array(ds.variables[var_name][:], dtype=np.float32) else: logger.warning("Element variable index %d not found in %s", fi + 1, sim_name) return MOOSERawData( coords=coords, connectivity=connectivity, field_names=elem_field_names, fields=fields, time_steps=time_steps, probe_data={}, # filled by caller probe_columns=[], # filled by caller sim_name=sim_name, )