"""Calibration artifact helpers for quasilinear transport models."""
from __future__ import annotations
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Iterable
import json
import numpy as np
from spectraxgk.quasilinear_window import (
NonlinearWindowConvergenceConfig,
nonlinear_window_convergence_report,
nonlinear_window_stats_promotion_ready,
)
_NETCDF_HEAT_FLUX_COLUMNS = {
"heat_flux": "Diagnostics/HeatFlux_st",
"heat_flux_es": "Diagnostics/HeatFluxES_st",
"heat_flux_apar": "Diagnostics/HeatFluxApar_st",
"heat_flux_bpar": "Diagnostics/HeatFluxBpar_st",
}
[docs]
@dataclass(frozen=True)
class QuasilinearCalibrationPoint:
"""One quasilinear-vs-nonlinear transport comparison point."""
case: str
split: str
predicted_heat_flux: float
observed_heat_flux: float
saturation_rule: str
raw_predicted_heat_flux: float | None = None
calibration_scale: float | None = None
geometry: str = "unspecified"
electron_model: str = "unspecified"
ky: float | None = None
observed_heat_flux_std: float | None = None
nonlinear_window_stats: dict[str, Any] | None = None
quasilinear_artifact: str | None = None
nonlinear_artifact: str | None = None
notes: str | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def _point_from_mapping(
item: QuasilinearCalibrationPoint | dict[str, Any],
) -> QuasilinearCalibrationPoint:
if isinstance(item, QuasilinearCalibrationPoint):
return item
return QuasilinearCalibrationPoint(**dict(item))
def _split_metrics(
points: list[QuasilinearCalibrationPoint], *, observed_floor: float
) -> dict[str, Any]:
if not points:
return {
"n": 0,
"rmse": None,
"mean_abs_relative_error": None,
"max_abs_relative_error": None,
}
pred = np.asarray([p.predicted_heat_flux for p in points], dtype=float)
obs = np.asarray([p.observed_heat_flux for p in points], dtype=float)
residual = pred - obs
denom = np.maximum(np.abs(obs), float(observed_floor))
rel = np.abs(residual) / denom
return {
"n": int(pred.size),
"rmse": float(np.sqrt(np.mean(residual**2))),
"mean_abs_relative_error": float(np.mean(rel)),
"max_abs_relative_error": float(np.max(rel)),
}
def _holdout_window_convergence_summary(
points: list[QuasilinearCalibrationPoint],
) -> dict[str, Any]:
holdouts = [point for point in points if point.split == "holdout"]
failures: list[str] = []
passed_cases: list[str] = []
for point in holdouts:
ready, point_failures = nonlinear_window_stats_promotion_ready(
point.nonlinear_window_stats
)
if ready:
passed_cases.append(point.case)
else:
failures.extend(f"{point.case}: {failure}" for failure in point_failures)
return {
"passed": bool(holdouts) and not failures,
"n_holdout": len(holdouts),
"n_passed": len(passed_cases),
"passed_cases": passed_cases,
"failures": failures,
}
[docs]
def fit_train_heat_flux_scale(
points: Iterable[QuasilinearCalibrationPoint | dict[str, Any]],
*,
train_split: str = "train",
prediction_floor: float = 1.0e-300,
) -> dict[str, Any]:
"""Fit one multiplicative heat-flux scale from training points.
The fit is a through-origin least-squares estimate,
``scale = sum(q_i Q_i) / sum(q_i^2)``, where ``q_i`` is the raw
quasilinear heat-flux estimate and ``Q_i`` is the nonlinear window mean.
This is the minimal calibration constant used by simple mixing-length
models; any held-out failure after this fit is therefore a model failure,
not a missing constant factor.
"""
floor = float(prediction_floor)
if not np.isfinite(floor) or floor < 0.0:
raise ValueError("prediction_floor must be finite and non-negative")
pts = [_point_from_mapping(item) for item in points]
train = [
p
for p in pts
if p.split == train_split
and np.isfinite(p.predicted_heat_flux)
and np.isfinite(p.observed_heat_flux)
and abs(p.predicted_heat_flux) > floor
]
if not train:
raise ValueError(
f"no finite nonzero '{train_split}' points available for scale fit"
)
pred = np.asarray([p.predicted_heat_flux for p in train], dtype=float)
obs = np.asarray([p.observed_heat_flux for p in train], dtype=float)
denom = float(np.dot(pred, pred))
if not np.isfinite(denom) or denom <= floor:
raise ValueError("training predictions are too small to fit a stable scale")
scale = float(np.dot(pred, obs) / denom)
if scale < 0.0:
raise ValueError(
"fitted heat-flux scale is negative; do not treat this as a saturation constant"
)
scaled_residual = scale * pred - obs
return {
"scale": scale,
"train_split": str(train_split),
"n_train": int(pred.size),
"fit_kind": "through_origin_least_squares",
"prediction_floor": floor,
"train_rmse": float(np.sqrt(np.mean(scaled_residual**2))),
}
[docs]
def apply_heat_flux_scale(
points: Iterable[QuasilinearCalibrationPoint | dict[str, Any]],
*,
scale: float,
note_label: str = "heat_flux_scale",
) -> list[QuasilinearCalibrationPoint]:
"""Return calibration points with heat-flux predictions multiplied by ``scale``."""
if not np.isfinite(scale) or scale < 0.0:
raise ValueError("scale must be finite and non-negative")
scaled = []
for item in points:
point = _point_from_mapping(item)
notes = [point.notes, f"{note_label}={scale:.12g}"]
scaled.append(
QuasilinearCalibrationPoint(
**{
**point.to_dict(),
"predicted_heat_flux": float(scale * point.predicted_heat_flux),
"raw_predicted_heat_flux": float(point.predicted_heat_flux),
"calibration_scale": float(scale),
"notes": "; ".join(str(note) for note in notes if note),
}
)
)
return scaled
[docs]
def quasilinear_calibration_report(
points: Iterable[QuasilinearCalibrationPoint | dict[str, Any]],
*,
saturation_rule: str,
version: str = "0.1",
holdout_mean_rel_gate: float = 0.35,
observed_floor: float = 1.0e-12,
fit_train_scale: bool = False,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build a JSON-friendly calibration/holdout report.
A report is considered a calibrated absolute-flux claim only when it has at
least one training point, at least one holdout point, and the holdout mean
relative error passes the supplied gate.
"""
pts = [_point_from_mapping(item) for item in points]
if not pts:
raise ValueError("at least one calibration point is required")
if observed_floor <= 0.0:
raise ValueError("observed_floor must be positive")
if holdout_mean_rel_gate <= 0.0:
raise ValueError("holdout_mean_rel_gate must be positive")
allowed_splits = {"train", "holdout", "audit"}
bad_splits = sorted({p.split for p in pts if p.split not in allowed_splits})
if bad_splits:
raise ValueError(f"unsupported calibration split(s): {bad_splits}")
nonfinite_cases = [
p.case
for p in pts
if not np.isfinite(p.predicted_heat_flux)
or not np.isfinite(p.observed_heat_flux)
or (
p.observed_heat_flux_std is not None
and not np.isfinite(p.observed_heat_flux_std)
)
]
if nonfinite_cases:
raise ValueError(
f"calibration points contain non-finite values: {nonfinite_cases}"
)
negative_std_cases = [
p.case
for p in pts
if p.observed_heat_flux_std is not None and p.observed_heat_flux_std < 0.0
]
if negative_std_cases:
raise ValueError(
f"calibration points contain negative observed_heat_flux_std: {negative_std_cases}"
)
point_rules = {p.saturation_rule for p in pts}
if point_rules != {str(saturation_rule)}:
raise ValueError(
"all calibration points must use the report saturation_rule "
f"{saturation_rule!r}; found {sorted(point_rules)}"
)
scale_fit = None
if fit_train_scale:
scale_fit = fit_train_heat_flux_scale(pts)
pts = apply_heat_flux_scale(
pts,
scale=float(scale_fit["scale"]),
note_label="train_fitted_heat_flux_scale",
)
by_split_points = {
split: [p for p in pts if p.split == split] for split in allowed_splits
}
by_split = {
split: _split_metrics(split_points, observed_floor=observed_floor)
for split, split_points in sorted(by_split_points.items())
}
all_metrics = _split_metrics(pts, observed_floor=observed_floor)
holdout = by_split["holdout"]
has_train = by_split["train"]["n"] > 0
has_holdout = holdout["n"] > 0
holdout_error = holdout["mean_abs_relative_error"]
holdout_window_convergence = _holdout_window_convergence_summary(pts)
passed = bool(
has_train
and has_holdout
and holdout_error is not None
and holdout_error <= holdout_mean_rel_gate
and holdout_window_convergence["passed"]
)
claim_level = "calibrated_absolute_flux" if passed else "calibration_dataset"
if not has_holdout:
claim_level = "training_or_audit_only"
return {
"kind": "quasilinear_calibration_report",
"version": str(version),
"saturation_rule": str(saturation_rule),
"claim_level": claim_level,
"passed": passed,
"holdout_mean_rel_gate": float(holdout_mean_rel_gate),
"observed_floor": float(observed_floor),
"metrics": all_metrics,
"by_split": by_split,
"points": [p.to_dict() for p in pts],
"metadata": {
**dict(metadata or {}),
"holdout_window_convergence": holdout_window_convergence,
**({} if scale_fit is None else {"heat_flux_scale_fit": scale_fit}),
},
}
[docs]
def integrated_quasilinear_flux_from_spectrum(
spectrum_csv: str | Path,
*,
column: str = "saturated_heat_flux_total",
ky_column: str = "ky",
method: str = "sum",
delta_ky: float | None = None,
) -> dict[str, Any]:
"""Integrate one quasilinear spectrum column into a scalar flux estimate.
``method="sum"`` preserves the discrete spectral-sum convention used by
most runtime diagnostics. ``method="trapezoid"`` is available for smooth
scan studies where the CSV is treated as a sampled function of ``ky``.
"""
path = Path(spectrum_csv)
data = np.genfromtxt(path, delimiter=",", names=True)
if data.shape == ():
data = np.asarray([data], dtype=data.dtype)
names = set(data.dtype.names or ())
if column not in names:
raise ValueError(f"{path} is missing quasilinear column '{column}'")
values = np.asarray(data[column], dtype=float)
finite = np.isfinite(values)
if ky_column in names:
ky = np.asarray(data[ky_column], dtype=float)
finite &= np.isfinite(ky)
else:
ky = np.arange(values.size, dtype=float)
if not np.any(finite):
raise ValueError(f"{path} contains no finite samples in column '{column}'")
values = values[finite]
ky = ky[finite]
method_key = method.strip().lower()
if method_key == "sum":
estimate = float(np.sum(values))
if delta_ky is not None:
width = float(delta_ky)
if not np.isfinite(width) or width <= 0.0:
raise ValueError("delta_ky must be finite and positive")
estimate *= width
elif method_key == "mean":
estimate = float(np.mean(values))
elif method_key == "trapezoid":
if values.size < 2:
raise ValueError(
"trapezoid integration requires at least two finite spectrum samples"
)
order = np.argsort(ky)
estimate = float(np.trapezoid(values[order], ky[order]))
else:
raise ValueError("method must be one of {'sum', 'mean', 'trapezoid'}")
return {
"estimate": estimate,
"method": method_key,
"column": str(column),
"ky_column": str(ky_column),
"delta_ky": None if delta_ky is None else float(delta_ky),
"n_samples": int(values.size),
"ky_min": float(np.min(ky)),
"ky_max": float(np.max(ky)),
"artifact": str(path),
}
def _resolve_summary_artifact(summary_path: Path, source: object) -> Path:
diag_path = Path(str(source))
if diag_path.is_absolute():
return diag_path
candidates = (
(summary_path.parent / diag_path).resolve(),
(summary_path.parent.parent / diag_path).resolve(),
(Path.cwd() / diag_path).resolve(),
)
return next(
(candidate for candidate in candidates if candidate.exists()), candidates[0]
)
def _window_from_summary(summary: dict[str, Any], t: np.ndarray) -> tuple[float, float]:
raw_tmin = summary.get("tmin")
raw_tmax = summary.get("tmax")
tmin = float(np.nanmin(t) if raw_tmin is None else raw_tmin)
tmax = float(np.nanmax(t) if raw_tmax is None else raw_tmax)
return tmin, tmax
def _read_csv_window_heat_flux(
path: Path,
summary: dict[str, Any],
*,
heat_flux_column: str,
) -> dict[str, Any]:
data = np.genfromtxt(path, delimiter=",", names=True)
if data.shape == ():
data = np.asarray([data], dtype=data.dtype)
names = set(data.dtype.names or ())
if "t" not in names:
raise ValueError(f"{path} is missing a 't' column")
if heat_flux_column not in names:
raise ValueError(f"{path} is missing heat-flux column '{heat_flux_column}'")
t = np.asarray(data["t"], dtype=float)
heat = np.asarray(data[heat_flux_column], dtype=float)
tmin, tmax = _window_from_summary(summary, t)
mask = (t >= tmin) & (t <= tmax) & np.isfinite(heat)
if not np.any(mask):
raise ValueError(f"no finite heat-flux samples in [{tmin}, {tmax}] from {path}")
return {
"mean": float(np.mean(heat[mask])),
"std": float(np.std(heat[mask])),
"tmin": tmin,
"tmax": tmax,
"n_samples": int(np.count_nonzero(mask)),
"variable": heat_flux_column,
}
def _csv_heat_flux_trace(
path: Path, *, heat_flux_column: str
) -> tuple[np.ndarray, np.ndarray]:
data = np.genfromtxt(path, delimiter=",", names=True)
if data.shape == ():
data = np.asarray([data], dtype=data.dtype)
names = set(data.dtype.names or ())
if "t" not in names:
raise ValueError(f"{path} is missing a 't' column")
if heat_flux_column not in names:
raise ValueError(f"{path} is missing heat-flux column '{heat_flux_column}'")
return np.asarray(data["t"], dtype=float), np.asarray(
data[heat_flux_column], dtype=float
)
def _netcdf_variable(root: Any, path: str) -> Any:
current = root
parts = [part for part in path.strip("/").split("/") if part]
if not parts:
raise ValueError("NetCDF variable path must not be empty")
for group in parts[:-1]:
if group not in current.groups:
raise KeyError(f"NetCDF group '{group}' not found in '{path}'")
current = current.groups[group]
name = parts[-1]
if name not in current.variables:
raise KeyError(f"NetCDF variable '{name}' not found in '{path}'")
return current.variables[name]
def _netcdf_heat_flux_variable(heat_flux_column: str) -> str:
key = str(heat_flux_column).strip()
if "/" in key:
return key
if key.startswith("Diagnostics"):
return key
if key in _NETCDF_HEAT_FLUX_COLUMNS:
return _NETCDF_HEAT_FLUX_COLUMNS[key]
if key.endswith("_st"):
return f"Diagnostics/{key}"
raise ValueError(
f"unknown NetCDF heat-flux column '{heat_flux_column}'. "
"Use one of heat_flux, heat_flux_es, heat_flux_apar, heat_flux_bpar, "
"or an explicit NetCDF path such as Diagnostics/HeatFlux_st."
)
def _read_netcdf_window_heat_flux(
path: Path,
summary: dict[str, Any],
*,
heat_flux_column: str,
species_index: int | None,
) -> dict[str, Any]:
try:
import netCDF4
except (
ImportError
) as exc: # pragma: no cover - exercised only without optional dependency.
raise RuntimeError(
"netCDF4 is required to read nonlinear NetCDF calibration windows"
) from exc
variable_path = _netcdf_heat_flux_variable(heat_flux_column)
with netCDF4.Dataset(path) as root:
t = np.asarray(_netcdf_variable(root, "Grids/time")[:], dtype=float)
values = np.asarray(_netcdf_variable(root, variable_path)[:], dtype=float)
if values.shape[0] != t.size:
raise ValueError(
f"{variable_path} first dimension does not match Grids/time in {path}"
)
if values.ndim == 1:
heat = values
elif values.ndim == 2:
if species_index is None:
heat = np.sum(values, axis=1)
else:
if species_index < 0 or species_index >= values.shape[1]:
raise ValueError(
f"species_index {species_index} is out of bounds for {values.shape[1]} species"
)
heat = values[:, int(species_index)]
else:
raise ValueError(
f"{variable_path} must have shape (time,) or (time, species), got {values.shape}"
)
tmin, tmax = _window_from_summary(summary, t)
mask = (t >= tmin) & (t <= tmax) & np.isfinite(heat)
if not np.any(mask):
raise ValueError(f"no finite heat-flux samples in [{tmin}, {tmax}] from {path}")
return {
"mean": float(np.mean(heat[mask])),
"std": float(np.std(heat[mask])),
"tmin": tmin,
"tmax": tmax,
"n_samples": int(np.count_nonzero(mask)),
"variable": variable_path,
}
def _netcdf_heat_flux_trace(
path: Path,
*,
heat_flux_column: str,
species_index: int | None,
) -> tuple[np.ndarray, np.ndarray, str]:
try:
import netCDF4
except (
ImportError
) as exc: # pragma: no cover - exercised only without optional dependency.
raise RuntimeError(
"netCDF4 is required to read nonlinear NetCDF calibration windows"
) from exc
variable_path = _netcdf_heat_flux_variable(heat_flux_column)
with netCDF4.Dataset(path) as root:
t = np.asarray(_netcdf_variable(root, "Grids/time")[:], dtype=float)
values = np.asarray(_netcdf_variable(root, variable_path)[:], dtype=float)
if values.shape[0] != t.size:
raise ValueError(
f"{variable_path} first dimension does not match Grids/time in {path}"
)
if values.ndim == 1:
heat = values
elif values.ndim == 2:
if species_index is None:
heat = np.sum(values, axis=1)
else:
if species_index < 0 or species_index >= values.shape[1]:
raise ValueError(
f"species_index {species_index} is out of bounds for {values.shape[1]} species"
)
heat = values[:, int(species_index)]
else:
raise ValueError(
f"{variable_path} must have shape (time,) or (time, species), got {values.shape}"
)
return t, heat, variable_path
def _window_convergence_config(
window: dict[str, Any],
config: NonlinearWindowConvergenceConfig | None,
) -> NonlinearWindowConvergenceConfig:
if config is not None:
return config
return NonlinearWindowConvergenceConfig(
tmin=window.get("tmin"),
tmax=window.get("tmax"),
transient_fraction=0.0,
)
[docs]
def calibration_point_from_nonlinear_window_summary(
summary_json: str | Path,
*,
predicted_heat_flux: float,
split: str,
saturation_rule: str,
diagnostics_source: str = "spectrax",
heat_flux_column: str = "heat_flux",
case: str | None = None,
geometry: str = "unspecified",
electron_model: str = "unspecified",
quasilinear_artifact: str | None = None,
species_index: int | None = None,
window_convergence_config: NonlinearWindowConvergenceConfig | None = None,
notes: str | None = None,
) -> QuasilinearCalibrationPoint:
"""Create a calibration point from a nonlinear window-summary JSON.
The helper reads the window bounds from a tracked nonlinear gate summary and
computes the mean/std of a heat-flux column from the selected diagnostics
CSV or runtime NetCDF. For NetCDF inputs, ``heat_flux_column='heat_flux'``
maps to ``Diagnostics/HeatFlux_st`` and species are summed by default.
"""
summary_path = Path(summary_json)
summary = json.loads(summary_path.read_text(encoding="utf-8"))
source = summary.get(diagnostics_source)
if source is None:
raise ValueError(
f"summary does not contain diagnostics source '{diagnostics_source}'"
)
diag_path = _resolve_summary_artifact(summary_path, source)
suffixes = [suffix.lower() for suffix in diag_path.suffixes]
if diag_path.suffix.lower() == ".csv":
window = _read_csv_window_heat_flux(
diag_path, summary, heat_flux_column=heat_flux_column
)
trace_t, trace_heat = _csv_heat_flux_trace(
diag_path, heat_flux_column=heat_flux_column
)
convergence_variable = heat_flux_column
elif suffixes[-2:] == [".out", ".nc"] or diag_path.suffix.lower() == ".nc":
window = _read_netcdf_window_heat_flux(
diag_path,
summary,
heat_flux_column=heat_flux_column,
species_index=species_index,
)
trace_t, trace_heat, convergence_variable = _netcdf_heat_flux_trace(
diag_path,
heat_flux_column=heat_flux_column,
species_index=species_index,
)
else:
raise NotImplementedError(
"nonlinear calibration ingestion supports diagnostics CSV and NetCDF files"
)
convergence = nonlinear_window_convergence_report(
trace_t,
trace_heat,
case=str(case or summary.get("case", summary_path.stem)),
observable=str(convergence_variable),
source_artifact=str(diag_path),
summary_artifact=str(summary_path),
config=_window_convergence_config(window, window_convergence_config),
)
note_items = [
notes,
None
if diag_path.suffix.lower() == ".csv"
else f"nonlinear_variable={window['variable']}",
f"nonlinear_window=[{window['tmin']:.12g},{window['tmax']:.12g}]",
f"nonlinear_window_samples={window['n_samples']}",
]
return QuasilinearCalibrationPoint(
case=str(case or summary.get("case", summary_path.stem)),
split=str(split),
predicted_heat_flux=float(predicted_heat_flux),
observed_heat_flux=float(window["mean"]),
observed_heat_flux_std=float(window["std"]),
nonlinear_window_stats=convergence,
saturation_rule=str(saturation_rule),
geometry=str(geometry),
electron_model=str(electron_model),
quasilinear_artifact=quasilinear_artifact,
nonlinear_artifact=str(diag_path),
notes="; ".join(str(item) for item in note_items if item),
)
[docs]
def calibration_point_from_spectrum_and_nonlinear_window(
spectrum_csv: str | Path,
summary_json: str | Path,
*,
split: str,
saturation_rule: str,
spectrum_column: str = "saturated_heat_flux_total",
spectrum_method: str = "sum",
delta_ky: float | None = None,
diagnostics_source: str = "spectrax",
heat_flux_column: str = "heat_flux",
case: str | None = None,
geometry: str = "unspecified",
electron_model: str = "unspecified",
species_index: int | None = None,
window_convergence_config: NonlinearWindowConvergenceConfig | None = None,
notes: str | None = None,
) -> QuasilinearCalibrationPoint:
"""Create a calibration point from a quasilinear spectrum and nonlinear window."""
estimate = integrated_quasilinear_flux_from_spectrum(
spectrum_csv,
column=spectrum_column,
method=spectrum_method,
delta_ky=delta_ky,
)
point = calibration_point_from_nonlinear_window_summary(
summary_json,
predicted_heat_flux=float(estimate["estimate"]),
split=split,
saturation_rule=saturation_rule,
diagnostics_source=diagnostics_source,
heat_flux_column=heat_flux_column,
case=case,
geometry=geometry,
electron_model=electron_model,
quasilinear_artifact=str(spectrum_csv),
species_index=species_index,
window_convergence_config=window_convergence_config,
notes=notes,
)
ratio = None
if point.predicted_heat_flux != 0.0 and np.isfinite(point.predicted_heat_flux):
ratio = point.observed_heat_flux / point.predicted_heat_flux
merged_notes = [
item
for item in (
point.notes,
f"ql_spectrum_method={estimate['method']}",
f"ql_spectrum_column={estimate['column']}",
None if ratio is None else f"observed_to_predicted={ratio:.6g}",
)
if item
]
return QuasilinearCalibrationPoint(
**{
**point.to_dict(),
"notes": "; ".join(merged_notes),
}
)
[docs]
def write_quasilinear_calibration_report(
path: str | Path, report: dict[str, Any]
) -> Path:
"""Write a quasilinear calibration report to JSON."""
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(
json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8"
)
return out
__all__ = [
"QuasilinearCalibrationPoint",
"apply_heat_flux_scale",
"calibration_point_from_nonlinear_window_summary",
"calibration_point_from_spectrum_and_nonlinear_window",
"fit_train_heat_flux_scale",
"integrated_quasilinear_flux_from_spectrum",
"quasilinear_calibration_report",
"write_quasilinear_calibration_report",
]