"""High-level benchmark helpers for scans and eigenfunction extraction."""
from __future__ import annotations
from dataclasses import dataclass
import json
from pathlib import Path
from typing import Callable
import netCDF4 as nc
import numpy as np
from spectraxgk.analysis import (
extract_eigenfunction,
extract_mode_time_series,
fit_growth_rate,
fit_growth_rate_auto,
)
from spectraxgk.benchmarks import LinearRunResult, LinearScanResult
from spectraxgk.grids import SpectralGrid, build_spectral_grid
from spectraxgk.validation_gates import (
BranchContinuationMetrics,
DiagnosticTimeSeries,
EigenfunctionComparisonMetrics,
EigenfunctionReferenceBundle,
GateReport as GateReport,
LateTimeLinearMetrics,
NonlinearHeatFluxConvergenceMetrics,
NonlinearWindowMetrics,
ObservedOrderMetrics,
ScalarGateResult as ScalarGateResult,
ZonalFlowResponseMetrics,
branch_continuity_gate_report as branch_continuity_gate_report,
eigenfunction_gate_report as eigenfunction_gate_report,
evaluate_scalar_gate as evaluate_scalar_gate,
gate_report as gate_report,
gate_report_to_dict as gate_report_to_dict,
linear_metrics_gate_report as linear_metrics_gate_report,
nonlinear_heat_flux_convergence_gate_report as nonlinear_heat_flux_convergence_gate_report,
nonlinear_window_gate_report as nonlinear_window_gate_report,
observed_order_gate_report as observed_order_gate_report,
zonal_response_gate_report as zonal_response_gate_report,
)
[docs]
@dataclass(frozen=True)
class ScanAndModeResult:
scan: LinearScanResult
eigenfunction: np.ndarray
grid: SpectralGrid
ky_selected: float
tmin: float | None
tmax: float | None
[docs]
def normalize_eigenfunction(eigenfunction: np.ndarray, z: np.ndarray) -> np.ndarray:
"""Normalize an eigenfunction by its value at theta=0 (nearest z=0)."""
idx = int(np.argmin(np.abs(z)))
scale = eigenfunction[idx]
if scale == 0:
return eigenfunction
return eigenfunction / scale
[docs]
def phase_align_eigenfunction(eigenfunction: np.ndarray, reference: np.ndarray) -> tuple[np.ndarray, float]:
"""Phase-align ``eigenfunction`` to ``reference`` using the global complex phase."""
lhs = np.asarray(eigenfunction, dtype=np.complex128)
rhs = np.asarray(reference, dtype=np.complex128)
if lhs.shape != rhs.shape:
raise ValueError("eigenfunction and reference must have the same shape")
phase = np.vdot(lhs, rhs)
if abs(phase) <= 1.0e-30:
return lhs, 0.0
phase_shift = float(np.angle(phase))
return lhs * np.exp(1j * phase_shift), phase_shift
[docs]
def compare_eigenfunctions(eigenfunction: np.ndarray, reference: np.ndarray) -> EigenfunctionComparisonMetrics:
"""Return normalized overlap and relative L2 error after global phase alignment."""
lhs = np.asarray(eigenfunction, dtype=np.complex128)
rhs = np.asarray(reference, dtype=np.complex128)
if lhs.shape != rhs.shape:
raise ValueError("eigenfunction and reference must have the same shape")
lhs_norm = float(np.linalg.norm(lhs))
rhs_norm = float(np.linalg.norm(rhs))
if lhs_norm <= 0.0 or rhs_norm <= 0.0:
return EigenfunctionComparisonMetrics(
overlap=float("nan"),
relative_l2=float("nan"),
phase_shift=0.0,
)
lhs_aligned, phase_shift = phase_align_eigenfunction(lhs, rhs)
overlap = float(np.abs(np.vdot(lhs, rhs)) / (lhs_norm * rhs_norm))
rel_l2 = float(np.linalg.norm(lhs_aligned - rhs) / rhs_norm)
return EigenfunctionComparisonMetrics(
overlap=overlap,
relative_l2=rel_l2,
phase_shift=phase_shift,
)
[docs]
def save_eigenfunction_reference_bundle(
path: str | Path,
*,
theta: np.ndarray,
mode: np.ndarray,
source: str,
case: str,
metadata: dict[str, object] | None = None,
) -> Path:
"""Write a frozen reference eigenfunction bundle as ``.npz``."""
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
np.savez(
out,
theta=np.asarray(theta, dtype=float),
mode=np.asarray(mode, dtype=np.complex128),
source=np.asarray(str(source)),
case=np.asarray(str(case)),
metadata_json=np.asarray(json.dumps(metadata or {}, sort_keys=True)),
)
return out
[docs]
def load_eigenfunction_reference_bundle(path: str | Path) -> EigenfunctionReferenceBundle:
"""Load a frozen reference eigenfunction bundle."""
data = np.load(Path(path), allow_pickle=False)
metadata_json = str(np.asarray(data["metadata_json"]).item()) if "metadata_json" in data else "{}"
return EigenfunctionReferenceBundle(
theta=np.asarray(data["theta"], dtype=float),
mode=np.asarray(data["mode"], dtype=np.complex128),
source=str(np.asarray(data["source"]).item()),
case=str(np.asarray(data["case"]).item()),
metadata=json.loads(metadata_json),
)
[docs]
def load_diagnostic_time_series(
path: str | Path,
*,
variable: str,
diagnostics_group: str = "Diagnostics",
time_group: str = "Grids",
time_var: str = "time",
kx_index: int | None = None,
component: str = "real",
align_phase: bool = False,
) -> DiagnosticTimeSeries:
"""Load a 1D diagnostics time series from a GX-style ``out.nc`` artifact."""
src = Path(path)
with nc.Dataset(src) as ds:
diag_group = ds.groups.get(diagnostics_group)
if diag_group is None:
raise ValueError(f"missing NetCDF group {diagnostics_group!r} in {src}")
if variable not in diag_group.variables:
raise ValueError(f"missing diagnostics variable {variable!r} in {src}")
var = diag_group.variables[variable]
raw = np.asarray(var[:])
dims = tuple(getattr(var, "dimensions", ()))
if dims and dims[-1] == "ri":
raw = np.asarray(raw[..., 0] + 1j * raw[..., 1], dtype=np.complex128)
if raw.ndim == 1:
values = raw
elif raw.ndim == 2:
if kx_index is None:
raise ValueError(f"diagnostics variable {variable!r} requires kx_index for 2D extraction")
values = raw[:, int(kx_index)]
else:
raise ValueError(f"diagnostics variable {variable!r} must reduce to a 1D time series")
if time_group in ds.groups and time_var in ds.groups[time_group].variables:
t = np.asarray(ds.groups[time_group].variables[time_var][:], dtype=float)
elif time_var in ds.variables:
t = np.asarray(ds.variables[time_var][:], dtype=float)
else:
raise ValueError(f"missing time variable {time_group}/{time_var} in {src}")
if t.ndim != 1 or t.size != values.size:
raise ValueError(f"time axis for {variable!r} must be one-dimensional and match the diagnostics length")
values_arr = np.asarray(values)
if np.iscomplexobj(values_arr):
if align_phase:
finite = np.isfinite(values_arr)
nz = finite & (np.abs(values_arr) > 1.0e-30)
if np.any(nz):
first = values_arr[np.flatnonzero(nz)[0]]
values_arr = values_arr * np.exp(-1j * np.angle(first))
component_key = str(component).lower()
if component_key == "complex":
selected = values_arr
elif component_key == "real":
selected = np.real(values_arr)
elif component_key == "imag":
selected = np.imag(values_arr)
elif component_key == "abs":
selected = np.abs(values_arr)
else:
raise ValueError("component must be one of {'real', 'imag', 'abs', 'complex'}")
else:
if component not in {"real", "abs"}:
raise ValueError("real diagnostics only support component='real' or 'abs'")
selected = np.abs(values_arr) if component == "abs" else np.asarray(values_arr, dtype=float)
return DiagnosticTimeSeries(
t=t,
values=np.asarray(selected),
variable=str(variable),
source_path=str(src),
)
def _tail_window(t: np.ndarray, tail_fraction: float) -> tuple[np.ndarray, float | None, float | None]:
if not 0.0 < float(tail_fraction) <= 1.0:
raise ValueError("tail_fraction must be in (0, 1]")
if t.ndim != 1:
raise ValueError("t must be one-dimensional")
if t.size == 0:
raise ValueError("t must be non-empty")
start = max(0, int(np.floor((1.0 - float(tail_fraction)) * t.size)))
mask = np.zeros_like(t, dtype=bool)
mask[start:] = True
if not np.any(mask):
mask[-1] = True
tt = np.asarray(t[mask], dtype=float)
return mask, float(tt[0]) if tt.size else None, float(tt[-1]) if tt.size else None
[docs]
def late_time_window(t: np.ndarray, *, tail_fraction: float = 0.4) -> tuple[float, float]:
"""Return the start/end of a late-time tail window.
This is the windowing convention used for manuscript-facing eigenfunction
extraction when the growth-rate fit window is not the same object as the
late-time mode-shape window.
"""
_mask, tmin, tmax = _tail_window(np.asarray(t, dtype=float), float(tail_fraction))
if tmin is None or tmax is None:
raise ValueError("late-time window requires a non-empty time axis")
return float(tmin), float(tmax)
[docs]
def infer_triple_dealiased_ny(nky_positive: int) -> int:
"""Infer the full ``Ny`` from the number of positive ``k_y`` points.
GX-style reference outputs typically store only the non-negative
``k_y`` branch. For the linked-boundary spectral grid used here, the
corresponding real-space ``Ny`` follows ``Ny = 3 * (nky - 1) + 1``.
"""
nky = int(nky_positive)
if nky < 2:
raise ValueError("nky_positive must be >= 2")
return 3 * (nky - 1) + 1
def _tail_stats(arr: np.ndarray, mask: np.ndarray) -> tuple[float, float]:
vals = np.asarray(arr, dtype=float)[mask]
vals = vals[np.isfinite(vals)]
if vals.size == 0:
return float("nan"), float("nan")
return float(np.mean(vals)), float(np.std(vals))
def _leading_window(
t: np.ndarray,
lead_fraction: float,
) -> tuple[np.ndarray, float | None, float | None]:
if not 0.0 < float(lead_fraction) <= 1.0:
raise ValueError("lead_fraction must be in (0, 1]")
if t.ndim != 1:
raise ValueError("t must be one-dimensional")
if t.size == 0:
raise ValueError("t must be non-empty")
stop = max(1, int(np.ceil(float(lead_fraction) * t.size)))
mask = np.zeros_like(t, dtype=bool)
mask[:stop] = True
tt = np.asarray(t[mask], dtype=float)
return mask, float(tt[0]) if tt.size else None, float(tt[-1]) if tt.size else None
def _explicit_time_window(
t: np.ndarray,
*,
tmin: float | None = None,
tmax: float | None = None,
) -> tuple[np.ndarray, float, float]:
mask = np.ones_like(t, dtype=bool)
if tmin is not None:
mask &= t >= float(tmin)
if tmax is not None:
mask &= t <= float(tmax)
if not np.any(mask):
raise ValueError("explicit fit window is empty")
tt = np.asarray(t[mask], dtype=float)
return mask, float(tt[0]), float(tt[-1])
def _analytic_signal(signal: np.ndarray) -> np.ndarray:
x = np.asarray(signal, dtype=float)
if x.ndim != 1 or x.size == 0:
raise ValueError("signal must be a non-empty one-dimensional array")
spec = np.fft.fft(x)
filt = np.zeros(x.size, dtype=float)
if x.size % 2 == 0:
filt[0] = 1.0
filt[x.size // 2] = 1.0
filt[1 : x.size // 2] = 2.0
else:
filt[0] = 1.0
filt[1 : (x.size + 1) // 2] = 2.0
return np.fft.ifft(spec * filt)
[docs]
def zonal_flow_response_metrics(
t: np.ndarray,
response: np.ndarray,
*,
tail_fraction: float = 0.3,
initial_fraction: float = 0.1,
initial_policy: str = "window_abs_mean",
initial_level_override: float | None = None,
peak_fit_max_peaks: int | None = None,
damping_fit_mode: str = "combined_envelope",
frequency_fit_mode: str = "peak_spacing",
fit_window_tmin: float | None = None,
fit_window_tmax: float | None = None,
hilbert_trim_fraction: float = 0.2,
) -> ZonalFlowResponseMetrics:
"""Estimate residual level and GAM envelope metrics from a zonal response.
The input ``response`` should be a scalar zonal observable such as zonal
potential or a normalized zonal-energy proxy on a uniform time trace.
``initial_policy="first_abs"`` follows Rosenbluth-Hinton/GAM convention by
normalizing to the initial potential magnitude; ``"window_abs_mean"`` keeps
the older robust behavior for generic noisy traces. ``initial_level_override``
supports benchmarks whose published normalization is an external initial
amplitude, for example a Gaussian potential maximum rather than the first
line-averaged sample.
"""
t_arr = np.asarray(t, dtype=float)
resp = np.asarray(response, dtype=float)
if t_arr.ndim != 1 or resp.ndim != 1 or t_arr.size != resp.size:
raise ValueError("t and response must be one-dimensional arrays of equal length")
if t_arr.size < 4:
raise ValueError("zonal-flow response requires at least four samples")
finite = np.isfinite(t_arr) & np.isfinite(resp)
t_arr = t_arr[finite]
resp = resp[finite]
if t_arr.size < 4:
raise ValueError("zonal-flow response requires at least four finite samples")
policy = str(initial_policy).strip().lower().replace("-", "_")
if policy not in {"window_abs_mean", "first_abs"}:
raise ValueError("initial_policy must be one of {'window_abs_mean', 'first_abs'}")
if peak_fit_max_peaks is not None and int(peak_fit_max_peaks) <= 0:
raise ValueError("peak_fit_max_peaks must be > 0 when provided")
damping_mode = str(damping_fit_mode).strip().lower().replace("-", "_")
if damping_mode not in {"combined_envelope", "branchwise_extrema"}:
raise ValueError("damping_fit_mode must be one of {'combined_envelope', 'branchwise_extrema'}")
frequency_mode = str(frequency_fit_mode).strip().lower().replace("-", "_")
if frequency_mode not in {"peak_spacing", "hilbert_phase"}:
raise ValueError("frequency_fit_mode must be one of {'peak_spacing', 'hilbert_phase'}")
if not 0.0 <= float(hilbert_trim_fraction) < 0.5:
raise ValueError("hilbert_trim_fraction must be in [0, 0.5)")
tail_mask, tail_tmin, tail_tmax = _tail_window(t_arr, float(tail_fraction))
tail_vals = resp[tail_mask]
if tail_vals.size == 0:
raise ValueError("response windows must be non-empty")
if initial_level_override is not None:
initial_level = float(initial_level_override)
elif policy == "first_abs":
initial_level = float(abs(resp[0]))
else:
lead_mask, _lead_tmin, _lead_tmax = _leading_window(t_arr, float(initial_fraction))
initial_vals = resp[lead_mask]
if initial_vals.size == 0:
raise ValueError("response windows must be non-empty")
initial_level = float(np.mean(np.abs(initial_vals)))
if initial_level <= 0.0 or not np.isfinite(initial_level):
raise ValueError("initial response level must be finite and positive")
residual = float(np.mean(tail_vals))
residual_std = float(np.std(tail_vals))
response_norm = resp / initial_level
residual_norm = residual / initial_level
residual_std_norm = residual_std / initial_level
response_rms = float(np.sqrt(np.mean(np.square(response_norm[tail_mask]))))
detrended_norm = response_norm - residual_norm
fit_mask, fit_tmin, fit_tmax = _explicit_time_window(
t_arr,
tmin=fit_window_tmin,
tmax=fit_window_tmax,
)
max_peak_idx = np.asarray([], dtype=int)
min_peak_idx = np.asarray([], dtype=int)
peak_idx = np.asarray([], dtype=int)
if detrended_norm.size >= 3:
max_peak_idx = (
np.flatnonzero(
(detrended_norm[1:-1] > detrended_norm[:-2])
& (detrended_norm[1:-1] >= detrended_norm[2:])
& (detrended_norm[1:-1] > 1.0e-12)
)
+ 1
)
min_peak_idx = (
np.flatnonzero(
(detrended_norm[1:-1] < detrended_norm[:-2])
& (detrended_norm[1:-1] <= detrended_norm[2:])
& (detrended_norm[1:-1] < -1.0e-12)
)
+ 1
)
peak_idx = np.sort(np.concatenate([max_peak_idx, min_peak_idx]))
peak_times = t_arr[peak_idx]
peak_values = np.abs(detrended_norm[peak_idx])
max_peak_times = t_arr[max_peak_idx]
max_peak_values = response_norm[max_peak_idx]
min_peak_times = t_arr[min_peak_idx]
min_peak_values = response_norm[min_peak_idx]
gam_frequency = float("nan")
gam_damping = float("nan")
peak_fit_count = 0
if damping_mode == "combined_envelope":
peak_fit_times = peak_times[fit_mask[peak_idx]]
peak_fit_values = peak_values[fit_mask[peak_idx]]
if peak_fit_max_peaks is not None and peak_fit_times.size:
nfit = min(int(peak_fit_max_peaks), int(peak_fit_times.size))
peak_fit_times = peak_fit_times[:nfit]
peak_fit_values = peak_fit_values[:nfit]
peak_fit_count = int(peak_fit_times.size)
valid = np.isfinite(peak_fit_values) & (peak_fit_values > 0.0)
if np.count_nonzero(valid) >= 2:
slope, _offset = np.polyfit(peak_fit_times[valid], np.log(peak_fit_values[valid]), 1)
gam_damping = float(-slope)
else:
branch_gammas: list[float] = []
branch_counts: list[int] = []
for branch_idx in (max_peak_idx, min_peak_idx):
idx = branch_idx[fit_mask[branch_idx]]
if peak_fit_max_peaks is not None and idx.size:
idx = idx[: min(int(peak_fit_max_peaks), int(idx.size))]
amp = np.abs(detrended_norm[idx])
valid = np.isfinite(amp) & (amp > 0.0)
if np.count_nonzero(valid) >= 2:
slope, _offset = np.polyfit(t_arr[idx][valid], np.log(amp[valid]), 1)
branch_gammas.append(float(-slope))
branch_counts.append(int(np.count_nonzero(valid)))
if branch_gammas:
gam_damping = float(np.mean(branch_gammas))
peak_fit_count = int(np.sum(branch_counts))
fit_peak_times = peak_times[fit_mask[peak_idx]]
if peak_fit_max_peaks is not None and damping_mode == "combined_envelope" and fit_peak_times.size:
fit_peak_times = fit_peak_times[: min(int(peak_fit_max_peaks), int(fit_peak_times.size))]
if frequency_mode == "peak_spacing":
freq_peak_times = fit_peak_times if fit_peak_times.size >= 2 else peak_times[fit_mask[peak_idx]]
if freq_peak_times.size >= 2:
dt_peaks = np.diff(freq_peak_times)
dt_peaks = dt_peaks[np.isfinite(dt_peaks) & (dt_peaks > 0.0)]
if dt_peaks.size:
gam_frequency = float(np.pi / np.mean(dt_peaks))
else:
fit_t = t_arr[fit_mask]
fit_signal = detrended_norm[fit_mask]
if fit_t.size >= 8:
analytic = _analytic_signal(fit_signal)
phase = np.unwrap(np.angle(analytic))
omega = np.gradient(phase, fit_t)
trim = int(np.floor(float(hilbert_trim_fraction) * fit_t.size))
trim_mask = np.ones_like(fit_t, dtype=bool)
if trim > 0:
trim_mask[:trim] = False
trim_mask[-trim:] = False
amp = np.abs(analytic)
valid = np.isfinite(omega) & np.isfinite(amp) & (amp > 1.0e-6) & trim_mask
if np.count_nonzero(valid) >= 2:
gam_frequency = float(np.mean(omega[valid]))
return ZonalFlowResponseMetrics(
initial_level=initial_level,
initial_policy=policy,
residual_level=residual_norm,
residual_std=residual_std_norm,
response_rms=response_rms,
gam_frequency=gam_frequency,
gam_damping_rate=gam_damping,
damping_method=damping_mode,
frequency_method=frequency_mode,
peak_count=int(peak_times.size),
peak_fit_count=int(peak_fit_count),
tmin=float(tail_tmin if tail_tmin is not None else t_arr[0]),
tmax=float(tail_tmax if tail_tmax is not None else t_arr[-1]),
fit_tmin=float(fit_tmin),
fit_tmax=float(fit_tmax),
peak_times=np.asarray(peak_times, dtype=float),
peak_envelope=np.asarray(peak_values, dtype=float),
max_peak_times=np.asarray(max_peak_times, dtype=float),
max_peak_values=np.asarray(max_peak_values, dtype=float),
min_peak_times=np.asarray(min_peak_times, dtype=float),
min_peak_values=np.asarray(min_peak_values, dtype=float),
)
[docs]
def late_time_linear_metrics(
result: object,
*,
tail_fraction: float = 0.5,
mode_method: str = "project",
) -> LateTimeLinearMetrics:
"""Return late-time growth/frequency metrics from a linear benchmark/runtime result."""
t = getattr(result, "t", None)
if t is None:
gamma = float(getattr(result, "gamma"))
omega = float(getattr(result, "omega"))
return LateTimeLinearMetrics(
gamma_fit=gamma,
omega_fit=omega,
gamma_tail_mean=gamma,
omega_tail_mean=omega,
gamma_tail_std=0.0,
omega_tail_std=0.0,
tmin=None,
tmax=None,
nsamples=1,
signal_source="scalar",
)
t_arr = np.asarray(t, dtype=float)
mask, tmin, tmax = _tail_window(t_arr, tail_fraction)
gamma_series = getattr(result, "gamma_t", None)
omega_series = getattr(result, "omega_t", None)
signal_source = "scalar"
gamma_fit = float(getattr(result, "gamma"))
omega_fit = float(getattr(result, "omega"))
signal = getattr(result, "signal", None)
if signal is not None:
signal_arr = np.asarray(signal, dtype=np.complex128)
signal_source = "signal"
elif hasattr(result, "phi_t") and hasattr(result, "selection"):
signal_arr = np.asarray(
extract_mode_time_series(
np.asarray(getattr(result, "phi_t")),
getattr(result, "selection"),
method=mode_method,
),
dtype=np.complex128,
)
signal_source = f"phi_t:{mode_method}"
else:
signal_arr = None
if signal_arr is not None:
finite = np.isfinite(signal_arr)
signal_tail = signal_arr[mask & finite]
t_tail = t_arr[mask & finite]
if t_tail.size >= 2:
gamma_fit, omega_fit = fit_growth_rate(t_tail, signal_tail)
if gamma_series is not None:
gamma_mean, gamma_std = _tail_stats(np.asarray(gamma_series), mask)
else:
gamma_mean, gamma_std = gamma_fit, 0.0
if omega_series is not None:
omega_mean, omega_std = _tail_stats(np.asarray(omega_series), mask)
else:
omega_mean, omega_std = omega_fit, 0.0
nsamples = int(np.count_nonzero(mask))
return LateTimeLinearMetrics(
gamma_fit=float(gamma_fit),
omega_fit=float(omega_fit),
gamma_tail_mean=float(gamma_mean),
omega_tail_mean=float(omega_mean),
gamma_tail_std=float(gamma_std),
omega_tail_std=float(omega_std),
tmin=tmin,
tmax=tmax,
nsamples=nsamples,
signal_source=signal_source,
)
[docs]
def windowed_nonlinear_metrics(
result: object,
*,
start_fraction: float = 0.5,
) -> NonlinearWindowMetrics:
"""Return late-window transport and envelope metrics from a nonlinear runtime result."""
diagnostics = getattr(result, "diagnostics", result)
if diagnostics is None:
raise ValueError("nonlinear diagnostics are required")
if not 0.0 <= float(start_fraction) < 1.0:
raise ValueError("start_fraction must be in [0, 1)")
t = np.asarray(getattr(diagnostics, "t", None), dtype=float)
if t.ndim != 1 or t.size == 0:
raise ValueError("diagnostics.t must be a non-empty one-dimensional array")
tail_fraction = max(np.finfo(float).eps, 1.0 - float(start_fraction))
mask, tmin, tmax = _tail_window(t, tail_fraction)
heat_flux = np.asarray(getattr(diagnostics, "heat_flux_t"), dtype=float)[mask]
wphi = np.asarray(getattr(diagnostics, "Wphi_t"), dtype=float)[mask]
wg = np.asarray(getattr(diagnostics, "Wg_t"), dtype=float)[mask]
heat_flux = heat_flux[np.isfinite(heat_flux)]
wphi = wphi[np.isfinite(wphi)]
wg = wg[np.isfinite(wg)]
if heat_flux.size == 0 or wphi.size == 0 or wg.size == 0:
raise ValueError("windowed diagnostics must contain finite heat/Wphi/Wg samples")
phi_mode = getattr(diagnostics, "phi_mode_t", None)
envelope_mean: float | None = None
envelope_std: float | None = None
envelope_max: float | None = None
if phi_mode is not None:
envelope = np.abs(np.asarray(phi_mode)[mask])
envelope = envelope[np.isfinite(envelope)]
if envelope.size:
envelope_mean = float(np.mean(envelope))
envelope_std = float(np.std(envelope))
envelope_max = float(np.max(envelope))
return NonlinearWindowMetrics(
tmin=float(tmin if tmin is not None else t[0]),
tmax=float(tmax if tmax is not None else t[-1]),
nsamples=int(np.count_nonzero(mask)),
heat_flux_mean=float(np.mean(heat_flux)),
heat_flux_std=float(np.std(heat_flux)),
heat_flux_rms=float(np.sqrt(np.mean(np.square(heat_flux)))),
wphi_mean=float(np.mean(wphi)),
wphi_std=float(np.std(wphi)),
wg_mean=float(np.mean(wg)),
wg_std=float(np.std(wg)),
phi_mode_envelope_mean=envelope_mean,
phi_mode_envelope_std=envelope_std,
phi_mode_envelope_max=envelope_max,
)
[docs]
def nonlinear_heat_flux_convergence_metrics(
t: np.ndarray,
heat_flux: np.ndarray,
*,
start_fraction: float = 0.5,
terminal_fraction: float = 0.5,
mean_floor: float = 1.0e-30,
) -> NonlinearHeatFluxConvergenceMetrics:
"""Summarize whether a post-transient heat-flux average is stable.
``start_fraction`` discards startup samples. ``terminal_fraction`` compares
the retained post-transient mean with the final subwindow of that retained
region. The normalized trend is the least-squares slope multiplied by the
post-transient time span and divided by the absolute post-transient mean.
"""
t_arr = np.asarray(t, dtype=float)
q_arr = np.asarray(heat_flux, dtype=float)
if t_arr.ndim != 1 or q_arr.ndim != 1 or t_arr.size != q_arr.size:
raise ValueError("t and heat_flux must be one-dimensional arrays of equal length")
if t_arr.size == 0:
raise ValueError("t and heat_flux must be non-empty")
if not 0.0 <= float(start_fraction) < 1.0:
raise ValueError("start_fraction must be in [0, 1)")
if not 0.0 < float(terminal_fraction) <= 1.0:
raise ValueError("terminal_fraction must be in (0, 1]")
if float(mean_floor) < 0.0:
raise ValueError("mean_floor must be non-negative")
finite = np.isfinite(t_arr) & np.isfinite(q_arr)
t_arr = t_arr[finite]
q_arr = q_arr[finite]
if t_arr.size == 0:
raise ValueError("t and heat_flux must contain at least one finite paired sample")
if t_arr.size > 1 and np.any(np.diff(t_arr) <= 0.0):
raise ValueError("t must be strictly increasing after finite-sample filtering")
tail_fraction = max(np.finfo(float).eps, 1.0 - float(start_fraction))
mask, tmin, tmax = _tail_window(t_arr, tail_fraction)
t_win = t_arr[mask]
q_win = q_arr[mask]
if q_win.size == 0:
raise ValueError("post-transient heat-flux window is empty")
terminal_start = max(0, int(np.floor((1.0 - float(terminal_fraction)) * q_win.size)))
t_terminal = t_win[terminal_start:]
q_terminal = q_win[terminal_start:]
if q_terminal.size == 0:
raise ValueError("terminal heat-flux window is empty")
mean = float(np.mean(q_win))
std = float(np.std(q_win))
rms = float(np.sqrt(np.mean(np.square(q_win))))
terminal_mean = float(np.mean(q_terminal))
scale = max(abs(mean), float(mean_floor))
cv = float(std / scale) if scale > 0.0 else float("inf")
mean_rel_delta = float(abs(terminal_mean - mean) / scale) if scale > 0.0 else float("inf")
trend = 0.0
if t_win.size >= 2 and float(t_win[-1] - t_win[0]) > 0.0:
slope, _offset = np.polyfit(t_win, q_win, 1)
trend = float(slope * (t_win[-1] - t_win[0]) / scale) if scale > 0.0 else float("inf")
return NonlinearHeatFluxConvergenceMetrics(
tmin=float(tmin if tmin is not None else t_win[0]),
tmax=float(tmax if tmax is not None else t_win[-1]),
nsamples=int(q_win.size),
heat_flux_mean=mean,
heat_flux_std=std,
heat_flux_cv=cv,
heat_flux_rms=rms,
terminal_tmin=float(t_terminal[0]),
terminal_tmax=float(t_terminal[-1]),
terminal_nsamples=int(q_terminal.size),
terminal_heat_flux_mean=terminal_mean,
mean_rel_delta=mean_rel_delta,
trend=trend,
abs_trend=float(abs(trend)),
start_fraction=float(start_fraction),
terminal_fraction=float(terminal_fraction),
)
[docs]
def estimate_observed_order(step_sizes: np.ndarray, errors: np.ndarray) -> ObservedOrderMetrics:
"""Estimate observed order from successive step-size refinements."""
h = np.asarray(step_sizes, dtype=float)
err = np.asarray(errors, dtype=float)
if h.ndim != 1 or err.ndim != 1 or h.size != err.size or h.size < 2:
raise ValueError("step_sizes and errors must be one-dimensional arrays of equal length >= 2")
if np.any(~np.isfinite(h)) or np.any(~np.isfinite(err)):
raise ValueError("step_sizes and errors must be finite")
if np.any(h <= 0.0):
raise ValueError("step_sizes must be positive")
if np.any(err <= 0.0):
raise ValueError("errors must be positive")
orders: list[float] = []
for i in range(h.size - 1):
if np.isclose(h[i], h[i + 1]):
raise ValueError("successive step sizes must differ")
orders.append(float(np.log(err[i] / err[i + 1]) / np.log(h[i] / h[i + 1])))
orders_arr = np.asarray(orders, dtype=float)
return ObservedOrderMetrics(
step_sizes=h,
errors=err,
orders=orders_arr,
asymptotic_order=float(orders_arr[-1]),
)
[docs]
def branch_continuity_metrics(
ky: np.ndarray,
gamma: np.ndarray,
omega: np.ndarray,
*,
successive_overlap: np.ndarray | None = None,
floor_fraction: float = 1.0e-8,
) -> BranchContinuationMetrics:
"""Compute branch-continuity diagnostics for a linear scan.
The relative jump normalization uses a local scale from adjacent values,
with a floor tied to the largest value in the scan. This avoids false
blow-ups near marginal points while still flagging branch jumps.
"""
ky_arr = np.asarray(ky, dtype=float)
gamma_arr = np.asarray(gamma, dtype=float)
omega_arr = np.asarray(omega, dtype=float)
if ky_arr.ndim != 1 or gamma_arr.ndim != 1 or omega_arr.ndim != 1:
raise ValueError("ky, gamma, and omega must be one-dimensional arrays")
if not (ky_arr.size == gamma_arr.size == omega_arr.size):
raise ValueError("ky, gamma, and omega must have equal length")
if ky_arr.size < 2:
raise ValueError("branch continuity requires at least two ky samples")
if np.any(~np.isfinite(ky_arr)) or np.any(~np.isfinite(gamma_arr)) or np.any(~np.isfinite(omega_arr)):
raise ValueError("ky, gamma, and omega must be finite")
floor = float(floor_fraction)
if floor < 0.0:
raise ValueError("floor_fraction must be non-negative")
def _relative_jumps(values: np.ndarray) -> np.ndarray:
jumps = np.abs(np.diff(values))
global_floor = max(float(np.nanmax(np.abs(values))) * floor, 1.0e-30)
local_scale = np.maximum(np.maximum(np.abs(values[:-1]), np.abs(values[1:])), global_floor)
return jumps / local_scale
overlap_min: float | None = None
if successive_overlap is not None:
overlap = np.asarray(successive_overlap, dtype=float)
if overlap.ndim != 1 or overlap.size != ky_arr.size - 1:
raise ValueError("successive_overlap must have length len(ky) - 1")
if np.any(~np.isfinite(overlap)):
raise ValueError("successive_overlap must be finite")
overlap_min = float(np.min(overlap))
gamma_jumps = _relative_jumps(gamma_arr)
omega_jumps = _relative_jumps(omega_arr)
return BranchContinuationMetrics(
ky=ky_arr,
gamma=gamma_arr,
omega=omega_arr,
rel_gamma_jumps=gamma_jumps,
rel_omega_jumps=omega_jumps,
max_rel_gamma_jump=float(np.max(gamma_jumps)),
max_rel_omega_jump=float(np.max(omega_jumps)),
min_successive_overlap=overlap_min,
)
[docs]
def run_linear_scan(
*,
ky_values: np.ndarray,
run_linear_fn,
cfg,
Nl: int,
Nm: int,
dt: float | np.ndarray,
steps: int | np.ndarray,
method: str,
solver: str,
krylov_cfg,
window_kw: dict,
tmin: float | np.ndarray | None = None,
tmax: float | np.ndarray | None = None,
auto_window: bool = True,
run_kwargs: dict | None = None,
resolution_policy: Callable[[float], tuple[int, int]] | None = None,
krylov_policy: Callable[[float], object] | None = None,
) -> LinearScanResult:
"""Run a linear scan over ky values."""
gammas: list[float] = []
omegas: list[float] = []
ky_out: list[float] = []
for i, ky in enumerate(ky_values):
if resolution_policy is not None:
Nl_i, Nm_i = resolution_policy(float(ky))
else:
Nl_i, Nm_i = int(Nl), int(Nm)
dt_i = float(dt[i]) if isinstance(dt, np.ndarray) else float(dt)
steps_i = int(steps[i]) if isinstance(steps, np.ndarray) else int(steps)
tmin_i = tmin[i] if isinstance(tmin, np.ndarray) else tmin
tmax_i = tmax[i] if isinstance(tmax, np.ndarray) else tmax
krylov_cfg_use = krylov_policy(float(ky)) if krylov_policy is not None else krylov_cfg
result = run_linear_fn(
ky_target=float(ky),
cfg=cfg,
Nl=int(Nl_i),
Nm=int(Nm_i),
dt=dt_i,
steps=steps_i,
method=method,
solver=solver,
krylov_cfg=krylov_cfg_use,
auto_window=auto_window,
tmin=tmin_i,
tmax=tmax_i,
**window_kw,
**(run_kwargs or {}),
)
gammas.append(float(result.gamma))
omegas.append(float(result.omega))
ky_out.append(float(result.ky))
return LinearScanResult(ky=np.array(ky_out), gamma=np.array(gammas), omega=np.array(omegas))
[docs]
def run_scan_and_mode(
*,
ky_values: np.ndarray,
scan_fn,
linear_fn,
cfg,
Nl: int,
Nm: int,
dt: float | np.ndarray,
steps: int | np.ndarray,
method: str,
solver: str,
mode_solver: str,
krylov_cfg,
window_kw: dict,
tmin: float | np.ndarray | None = None,
tmax: float | np.ndarray | None = None,
auto_window: bool = True,
run_kwargs: dict | None = None,
mode_kwargs: dict | None = None,
resolution_policy: Callable[[float], tuple[int, int]] | None = None,
krylov_policy: Callable[[float], object] | None = None,
select_ky: Callable[[LinearScanResult], float] | None = None,
) -> ScanAndModeResult:
"""Run a scan and extract a representative eigenfunction."""
if select_ky is None:
def select_ky(scan: LinearScanResult) -> float:
return float(scan.ky[int(np.nanargmax(scan.gamma))])
scan = run_linear_scan(
ky_values=ky_values,
run_linear_fn=linear_fn,
cfg=cfg,
Nl=Nl,
Nm=Nm,
dt=dt,
steps=steps,
method=method,
solver=solver,
krylov_cfg=krylov_cfg,
window_kw=window_kw,
tmin=tmin,
tmax=tmax,
auto_window=auto_window,
run_kwargs=run_kwargs,
resolution_policy=resolution_policy,
krylov_policy=krylov_policy,
)
ky_sel = float(select_ky(scan))
if resolution_policy is not None:
Nl_mode, Nm_mode = resolution_policy(ky_sel)
else:
Nl_mode, Nm_mode = int(Nl), int(Nm)
idx = int(np.argmin(np.abs(scan.ky - ky_sel)))
dt_mode = float(dt[idx]) if isinstance(dt, np.ndarray) else float(dt)
steps_mode = int(steps[idx]) if isinstance(steps, np.ndarray) else int(steps)
run: LinearRunResult = linear_fn(
cfg=cfg,
ky_target=ky_sel,
Nl=int(Nl_mode),
Nm=int(Nm_mode),
dt=dt_mode,
steps=steps_mode,
method=method,
solver=mode_solver,
**window_kw,
**(mode_kwargs or {}),
)
grid = build_spectral_grid(cfg.grid)
if run.t.size < 2:
tmin_fit = None
tmax_fit = None
else:
signal = extract_mode_time_series(run.phi_t, run.selection, method="project")
_g, _w, tmin_fit, tmax_fit = fit_growth_rate_auto(run.t, signal, **window_kw)
z_np = np.asarray(grid.z)
eig = extract_eigenfunction(
run.phi_t, run.t, run.selection, z=z_np, method="snapshot", tmin=tmin_fit, tmax=tmax_fit
)
return ScanAndModeResult(
scan=scan,
eigenfunction=eig,
grid=grid,
ky_selected=ky_sel,
tmin=tmin_fit,
tmax=tmax_fit,
)