"""Admission policy for VMEC-JAX transport-optimization candidates.
This module is deliberately small and independent of VMEC-JAX internals. It
operates on JSON-safe candidate summaries produced by the optimization and
ladder tools, then answers the release-critical question: which solved WOUT, if
any, is physically admissible for expensive long-window nonlinear audits?
"""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from math import pi
from typing import Any, cast
import numpy as np
DEFAULT_TRANSPORT_METRIC_KEYS = (
"transport_objective_final",
"spectrax_objective_final",
"transport_metric_final",
"objective_final",
)
def _finite_float_or_none(value: Any) -> float | None:
try:
out = float(value)
except Exception:
return None
return out if np.isfinite(out) else None
[docs]
@dataclass(frozen=True)
class VMECJAXTransportAdmissionPolicy:
"""Fail-closed policy for selecting transport-aware VMEC candidates."""
metric_keys: tuple[str, ...] = DEFAULT_TRANSPORT_METRIC_KEYS
minimum_relative_improvement: float = 0.0
lower_is_better: bool = True
require_authoritative_gate: bool = True
allow_baseline_fallback: bool = True
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-safe representation."""
return {
"metric_keys": list(self.metric_keys),
"minimum_relative_improvement": float(self.minimum_relative_improvement),
"lower_is_better": bool(self.lower_is_better),
"require_authoritative_gate": bool(self.require_authoritative_gate),
"allow_baseline_fallback": bool(self.allow_baseline_fallback),
}
[docs]
@dataclass(frozen=True)
class VMECJAXNonlinearAuditPolicy:
"""Policy for promoting or redesigning VMEC-JAX transport candidates.
Reduced growth/quasilinear/nonlinear-window objectives are useful only if
they transfer to late-window nonlinear transport. This policy encodes the
minimum replicated-audit evidence and sample coverage required before a
candidate can be promoted beyond local reduced-metric admission.
"""
minimum_relative_reduction: float = 0.02
minimum_uncertainty_z_score: float = 1.0
maximum_combined_sem_rel: float = 0.25
minimum_replicate_count: int = 3
minimum_surface_count: int = 3
minimum_alpha_count: int = 2
minimum_ky_count: int = 3
minimum_sample_count: int = 12
recommended_surfaces: tuple[float, ...] = (0.45, 0.64, 0.78)
recommended_alphas: tuple[float, ...] = (0.0, pi / 4.0)
recommended_ky_values: tuple[float, ...] = (0.10, 0.30, 0.50)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-safe representation."""
return {
"minimum_relative_reduction": float(self.minimum_relative_reduction),
"minimum_uncertainty_z_score": float(self.minimum_uncertainty_z_score),
"maximum_combined_sem_rel": float(self.maximum_combined_sem_rel),
"minimum_replicate_count": int(self.minimum_replicate_count),
"minimum_surface_count": int(self.minimum_surface_count),
"minimum_alpha_count": int(self.minimum_alpha_count),
"minimum_ky_count": int(self.minimum_ky_count),
"minimum_sample_count": int(self.minimum_sample_count),
"recommended_surfaces": [float(item) for item in self.recommended_surfaces],
"recommended_alphas": [float(item) for item in self.recommended_alphas],
"recommended_ky_values": [float(item) for item in self.recommended_ky_values],
}
[docs]
@dataclass(frozen=True)
class VMECJAXReducedPrelaunchPolicy:
"""Fail-closed reduced-objective gate before expensive nonlinear audits."""
metric_key: str = "nonlinear_window_heat_flux"
minimum_relative_reduction: float = 0.04
failed_reference_safety_factor: float = 1.5
require_sample_coverage: bool = True
maximum_cross_sample_sem_rel: float = 0.35
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-safe representation."""
return {
"metric_key": str(self.metric_key),
"minimum_relative_reduction": float(self.minimum_relative_reduction),
"failed_reference_safety_factor": float(self.failed_reference_safety_factor),
"require_sample_coverage": bool(self.require_sample_coverage),
"maximum_cross_sample_sem_rel": float(self.maximum_cross_sample_sem_rel),
}
[docs]
@dataclass(frozen=True)
class VMECJAXNonlinearCampaignPolicy:
"""Admission limits for launching the next nonlinear optimizer campaign.
This gate sits between a reduced candidate screen and a broader optimizer
campaign. Passing it means the next campaign is worth launching; it does
not promote a production nonlinear turbulent-flux optimization claim.
"""
minimum_landscape_relative_reduction: float = 0.10
minimum_landscape_uncertainty_z_score: float = 3.0
maximum_landscape_sem_rel: float = 0.05
minimum_landscape_replicate_count: int = 3
require_reduced_prelaunch_passed: bool = True
require_reduced_cross_sample_gate: bool = True
require_landscape_admission_passed: bool = True
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return a JSON-safe representation."""
return {
"minimum_landscape_relative_reduction": float(
self.minimum_landscape_relative_reduction
),
"minimum_landscape_uncertainty_z_score": float(
self.minimum_landscape_uncertainty_z_score
),
"maximum_landscape_sem_rel": float(self.maximum_landscape_sem_rel),
"minimum_landscape_replicate_count": int(
self.minimum_landscape_replicate_count
),
"require_reduced_prelaunch_passed": bool(
self.require_reduced_prelaunch_passed
),
"require_reduced_cross_sample_gate": bool(
self.require_reduced_cross_sample_gate
),
"require_landscape_admission_passed": bool(
self.require_landscape_admission_passed
),
}
def _ensemble_statistics(ensemble: Mapping[str, Any]) -> dict[str, Any]:
stats = ensemble.get("statistics", {})
if not isinstance(stats, Mapping):
stats = {}
return {
"passed": bool(ensemble.get("passed", False)),
"ensemble_mean": _finite_float_or_none(stats.get("ensemble_mean")),
"combined_sem": _finite_float_or_none(stats.get("combined_sem")),
"combined_sem_rel": _finite_float_or_none(stats.get("combined_sem_rel")),
"n_reports": int(stats.get("n_reports", 0) or 0),
"case": ensemble.get("case"),
}
def _sample_statistics_summary(
sample_statistics: Mapping[str, Any] | None,
*,
policy: VMECJAXReducedPrelaunchPolicy,
role: str,
) -> dict[str, Any]:
"""Return a gate row for deterministic reduced-metric sample dispersion."""
if not isinstance(sample_statistics, Mapping):
return {
"role": role,
"available": False,
"weighted_mean": None,
"weighted_standard_error": None,
"cross_sample_sem_rel": None,
"passed": None,
"blockers": [],
}
weighted_mean = _finite_float_or_none(sample_statistics.get("weighted_mean"))
weighted_sem = _finite_float_or_none(
sample_statistics.get("weighted_standard_error")
)
blockers: list[str] = []
sem_rel = None
if weighted_mean is None:
blockers.append(f"{role}_missing_cross_sample_weighted_mean")
if weighted_sem is None:
blockers.append(f"{role}_missing_cross_sample_weighted_sem")
if weighted_mean is not None and weighted_sem is not None:
sem_rel = float(abs(weighted_sem) / max(abs(weighted_mean), 1.0e-300))
if sem_rel > float(policy.maximum_cross_sample_sem_rel):
blockers.append(f"{role}_cross_sample_sem_rel_too_large")
return {
"role": role,
"available": True,
"weighted_mean": weighted_mean,
"weighted_standard_error": weighted_sem,
"cross_sample_sem_rel": sem_rel,
"passed": not blockers,
"blockers": blockers,
}
def _ensemble_blockers(
stats: Mapping[str, Any],
*,
role: str,
policy: VMECJAXNonlinearAuditPolicy,
) -> list[str]:
blockers: list[str] = []
if not bool(stats.get("passed")):
blockers.append(f"{role}_ensemble_failed")
if stats.get("ensemble_mean") is None:
blockers.append(f"{role}_missing_ensemble_mean")
if stats.get("combined_sem") is None:
blockers.append(f"{role}_missing_combined_sem")
sem_rel = stats.get("combined_sem_rel")
if sem_rel is None:
blockers.append(f"{role}_missing_combined_sem_rel")
elif float(sem_rel) > float(policy.maximum_combined_sem_rel):
blockers.append(f"{role}_combined_sem_rel_too_large")
if int(stats.get("n_reports", 0) or 0) < int(policy.minimum_replicate_count):
blockers.append(f"{role}_insufficient_replicates")
return blockers
[docs]
def build_nonlinear_landscape_admission_report(
baseline_ensemble: Mapping[str, Any],
candidate_ensembles: Sequence[Mapping[str, Any]],
*,
candidate_labels: Sequence[str] | None = None,
policy: VMECJAXNonlinearAuditPolicy | None = None,
) -> dict[str, Any]:
"""Select an uncertainty-separated nonlinear candidate from a landscape.
This gate is for boundary-coefficient or line-search landscapes where a
small number of selected points have replicated late-window nonlinear
ensembles. It does not validate multi-coefficient global optimization by
itself; it only answers whether any supplied candidate has a statistically
resolved lower heat flux than the supplied baseline ensemble.
"""
policy = policy or VMECJAXNonlinearAuditPolicy()
labels = (
tuple(str(item) for item in candidate_labels)
if candidate_labels is not None
else tuple(f"candidate_{index}" for index, _ in enumerate(candidate_ensembles))
)
if len(labels) != len(candidate_ensembles):
raise ValueError("candidate_labels must have the same length as candidate_ensembles")
baseline_stats = _ensemble_statistics(baseline_ensemble)
baseline_blockers = _ensemble_blockers(baseline_stats, role="baseline", policy=policy)
rows: list[dict[str, Any]] = []
admitted: list[dict[str, Any]] = []
baseline_mean = cast(float | None, baseline_stats.get("ensemble_mean"))
baseline_sem = cast(float | None, baseline_stats.get("combined_sem"))
for label, ensemble in zip(labels, candidate_ensembles, strict=True):
stats = _ensemble_statistics(ensemble)
blockers = list(baseline_blockers)
blockers.extend(_ensemble_blockers(stats, role="candidate", policy=policy))
candidate_mean = cast(float | None, stats.get("ensemble_mean"))
candidate_sem = cast(float | None, stats.get("combined_sem"))
relative_reduction = None
uncertainty_z_score = None
if baseline_mean is not None and candidate_mean is not None:
relative_reduction = float((baseline_mean - candidate_mean) / max(abs(baseline_mean), 1.0e-300))
if relative_reduction < float(policy.minimum_relative_reduction):
blockers.append("insufficient_relative_reduction")
else:
blockers.append("missing_relative_reduction")
if baseline_mean is not None and candidate_mean is not None and baseline_sem is not None and candidate_sem is not None:
combined_uncertainty = float(np.hypot(baseline_sem, candidate_sem))
uncertainty_z_score = float((baseline_mean - candidate_mean) / max(combined_uncertainty, 1.0e-300))
if uncertainty_z_score < float(policy.minimum_uncertainty_z_score):
blockers.append("insufficient_uncertainty_separation")
else:
blockers.append("missing_uncertainty_z_score")
row = {
"label": label,
"case": stats.get("case"),
"ensemble_mean": candidate_mean,
"combined_sem": candidate_sem,
"combined_sem_rel": stats.get("combined_sem_rel"),
"n_reports": stats.get("n_reports"),
"relative_reduction": relative_reduction,
"uncertainty_z_score": uncertainty_z_score,
"admission_blockers": blockers,
"admitted": not blockers,
}
rows.append(row)
if not blockers:
admitted.append(row)
selected = None
if admitted:
selected = max(
admitted,
key=lambda row: (
float(row.get("relative_reduction") or 0.0),
float(row.get("uncertainty_z_score") or 0.0),
),
)
return {
"kind": "nonlinear_landscape_admission_report",
"claim_scope": (
"selected replicated nonlinear landscape admission; not a multi-coefficient "
"or multi-flux-tube turbulent-optimization claim"
),
"policy": policy.to_dict(),
"baseline": baseline_stats,
"candidates": rows,
"selected_candidate": selected,
"passed": selected is not None,
"next_action": (
"use selected direction for the next uncertainty-aware optimizer admission step"
if selected is not None
else "do not launch a broader optimizer from this landscape without more resolved nonlinear evidence"
),
}
[docs]
def build_reduced_nonlinear_audit_prelaunch_report(
*,
baseline_metric: float,
candidate_metric: float,
objective_sample_set: Any = None,
baseline_sample_statistics: Mapping[str, Any] | None = None,
candidate_sample_statistics: Mapping[str, Any] | None = None,
failed_reference_relative_reduction: float | None = None,
policy: VMECJAXReducedPrelaunchPolicy | None = None,
nonlinear_policy: VMECJAXNonlinearAuditPolicy | None = None,
) -> dict[str, Any]:
"""Gate reduced transport candidates before launching nonlinear audits.
This is intentionally conservative. A reduced nonlinear-window improvement
should exceed both an absolute release threshold and, when available, a
safety factor above a known failed-transfer reference before spending
another long GPU campaign.
"""
policy = policy or VMECJAXReducedPrelaunchPolicy()
nonlinear_policy = nonlinear_policy or VMECJAXNonlinearAuditPolicy()
baseline = _finite_float_or_none(baseline_metric)
candidate = _finite_float_or_none(candidate_metric)
failed_reference = _finite_float_or_none(failed_reference_relative_reduction)
blockers: list[str] = []
relative_reduction = None
if baseline is None:
blockers.append("missing_baseline_reduced_metric")
if candidate is None:
blockers.append("missing_candidate_reduced_metric")
if baseline is not None and candidate is not None:
relative_reduction = float((baseline - candidate) / max(abs(baseline), 1.0e-300))
threshold = float(policy.minimum_relative_reduction)
threshold_sources = {
"policy_minimum_relative_reduction": float(policy.minimum_relative_reduction),
"failed_reference_relative_reduction": failed_reference,
"failed_reference_safety_factor": float(policy.failed_reference_safety_factor),
"failed_reference_threshold": None,
}
if failed_reference is not None:
failed_threshold = float(policy.failed_reference_safety_factor) * failed_reference
threshold_sources["failed_reference_threshold"] = failed_threshold
threshold = max(threshold, failed_threshold)
if relative_reduction is None:
blockers.append("missing_relative_reduced_reduction")
elif relative_reduction < threshold:
blockers.append("insufficient_reduced_margin_for_nonlinear_audit")
sample_summary = transport_objective_sample_summary(
objective_sample_set,
policy=nonlinear_policy,
)
if bool(policy.require_sample_coverage) and not bool(sample_summary["passed"]):
blockers.extend(str(item) for item in sample_summary["blockers"])
cross_sample_rows = [
_sample_statistics_summary(
baseline_sample_statistics,
policy=policy,
role="baseline",
),
_sample_statistics_summary(
candidate_sample_statistics,
policy=policy,
role="candidate",
),
]
cross_sample_available = all(bool(row["available"]) for row in cross_sample_rows)
cross_sample_passed = (
None
if not cross_sample_available
else all(bool(row["passed"]) for row in cross_sample_rows)
)
if cross_sample_available and cross_sample_passed is False:
for row in cross_sample_rows:
blockers.extend(str(item) for item in row["blockers"])
return {
"kind": "vmec_jax_reduced_nonlinear_audit_prelaunch_report",
"claim_scope": (
"reduced-objective prelaunch guard only; passing this gate permits a "
"replicated nonlinear audit but does not promote a turbulence claim"
),
"policy": policy.to_dict(),
"nonlinear_policy": nonlinear_policy.to_dict(),
"metric_key": str(policy.metric_key),
"baseline_metric": baseline,
"candidate_metric": candidate,
"relative_reduced_reduction": relative_reduction,
"required_relative_reduced_reduction": threshold,
"threshold_sources": threshold_sources,
"objective_sample_summary": sample_summary,
"reduced_cross_sample_statistics": {
"available": cross_sample_available,
"passed": cross_sample_passed,
"rows": cross_sample_rows,
"claim_scope": (
"deterministic spread over the reduced surface/field-line/ky "
"objective grid; not stochastic nonlinear heat-flux uncertainty"
),
},
"passed": not blockers,
"blockers": blockers,
"gates": [
{
"metric": "reduced_margin_for_nonlinear_audit",
"passed": "insufficient_reduced_margin_for_nonlinear_audit" not in blockers
and "missing_relative_reduced_reduction" not in blockers,
"value": relative_reduction,
"threshold": threshold,
},
{
"metric": "multi_sample_objective_coverage",
"passed": bool(sample_summary["passed"]),
"value": int(sample_summary["sample_count"]),
"threshold": int(nonlinear_policy.minimum_sample_count),
},
{
"metric": "reduced_cross_sample_dispersion",
"passed": cross_sample_passed,
"value": [
row["cross_sample_sem_rel"]
for row in cross_sample_rows
if row["cross_sample_sem_rel"] is not None
],
"threshold": float(policy.maximum_cross_sample_sem_rel),
},
],
"next_action": (
"launch replicated long-window nonlinear audit only with baseline/candidate ensembles"
if not blockers
else (
"do not launch an expensive nonlinear audit; increase reduced-objective "
"margin or broaden the objective before spending GPU time"
)
),
}
[docs]
def build_nonlinear_campaign_admission_report(
*,
reduced_prelaunch_report: Mapping[str, Any],
landscape_admission_report: Mapping[str, Any],
policy: VMECJAXNonlinearCampaignPolicy | None = None,
) -> dict[str, Any]:
"""Gate the next nonlinear optimizer campaign from existing evidence.
This report intentionally promotes only a *campaign launch*. It requires a
reduced prelaunch pass and an uncertainty-separated replicated nonlinear
landscape point. It does not convert that point into a general
multi-coefficient turbulent-flux optimization result.
"""
policy = policy or VMECJAXNonlinearCampaignPolicy()
blockers: list[str] = []
gates: list[dict[str, Any]] = []
prelaunch_passed = bool(reduced_prelaunch_report.get("passed", False))
gates.append(
{
"metric": "reduced_prelaunch_gate",
"passed": prelaunch_passed,
"detail": reduced_prelaunch_report.get("blockers", []),
}
)
if bool(policy.require_reduced_prelaunch_passed) and not prelaunch_passed:
blockers.append("reduced_prelaunch_gate_failed")
sample_summary = reduced_prelaunch_report.get("objective_sample_summary")
sample_passed = (
bool(sample_summary.get("passed", False))
if isinstance(sample_summary, Mapping)
else False
)
gates.append(
{
"metric": "reduced_objective_sample_coverage",
"passed": sample_passed,
"value": (
int(sample_summary["sample_count"])
if isinstance(sample_summary, Mapping)
and sample_summary.get("sample_count") is not None
else None
),
"detail": (
sample_summary.get("blockers", [])
if isinstance(sample_summary, Mapping)
else "missing objective_sample_summary"
),
}
)
if not sample_passed:
blockers.append("reduced_objective_sample_coverage_failed")
cross_sample = reduced_prelaunch_report.get("reduced_cross_sample_statistics")
cross_sample_available = (
bool(cross_sample.get("available", False))
if isinstance(cross_sample, Mapping)
else False
)
cross_sample_passed = (
bool(cross_sample.get("passed", False))
if isinstance(cross_sample, Mapping)
and cross_sample.get("passed") is not None
else None
)
gates.append(
{
"metric": "reduced_cross_sample_dispersion",
"passed": cross_sample_passed,
"detail": (
cross_sample.get("rows", [])
if isinstance(cross_sample, Mapping)
else "missing reduced_cross_sample_statistics"
),
}
)
if bool(policy.require_reduced_cross_sample_gate):
if not cross_sample_available:
blockers.append("reduced_cross_sample_statistics_missing")
elif cross_sample_passed is not True:
blockers.append("reduced_cross_sample_dispersion_failed")
landscape_passed = bool(landscape_admission_report.get("passed", False))
gates.append(
{
"metric": "replicated_landscape_admission",
"passed": landscape_passed,
"detail": landscape_admission_report.get("next_action"),
}
)
if bool(policy.require_landscape_admission_passed) and not landscape_passed:
blockers.append("replicated_landscape_admission_failed")
selected = landscape_admission_report.get("selected_candidate")
selected_map: Mapping[str, Any] = selected if isinstance(selected, Mapping) else {}
if not selected_map:
blockers.append("missing_selected_landscape_candidate")
relative_reduction = _finite_float_or_none(selected_map.get("relative_reduction"))
z_score = _finite_float_or_none(selected_map.get("uncertainty_z_score"))
sem_rel = _finite_float_or_none(selected_map.get("combined_sem_rel"))
n_reports = int(selected_map.get("n_reports", 0) or 0)
candidate_gates = [
(
"landscape_relative_reduction",
relative_reduction is not None
and relative_reduction
>= float(policy.minimum_landscape_relative_reduction),
relative_reduction,
float(policy.minimum_landscape_relative_reduction),
"selected_landscape_reduction_too_small",
),
(
"landscape_uncertainty_separation",
z_score is not None
and z_score >= float(policy.minimum_landscape_uncertainty_z_score),
z_score,
float(policy.minimum_landscape_uncertainty_z_score),
"selected_landscape_uncertainty_separation_too_small",
),
(
"landscape_candidate_sem_rel",
sem_rel is not None and sem_rel <= float(policy.maximum_landscape_sem_rel),
sem_rel,
float(policy.maximum_landscape_sem_rel),
"selected_landscape_sem_rel_too_large",
),
(
"landscape_candidate_replicates",
n_reports >= int(policy.minimum_landscape_replicate_count),
n_reports,
int(policy.minimum_landscape_replicate_count),
"selected_landscape_insufficient_replicates",
),
]
for metric, passed, value, threshold, blocker in candidate_gates:
gates.append(
{
"metric": metric,
"passed": bool(passed),
"value": value,
"threshold": threshold,
}
)
if not passed:
blockers.append(blocker)
admitted = not blockers
return {
"kind": "vmec_jax_nonlinear_campaign_admission_report",
"claim_scope": (
"next nonlinear optimizer-campaign admission only; not a production "
"multi-coefficient turbulent-flux optimization claim"
),
"policy": policy.to_dict(),
"passed": admitted,
"campaign_admitted": admitted,
"blockers": blockers,
"gates": gates,
"selected_landscape_candidate": dict(selected_map) if selected_map else None,
"next_action": (
"launch a bounded multi-control optimizer campaign from the admitted landscape direction, "
"with matched baseline/candidate t=[350,700] replicated nonlinear audits before promotion"
if admitted
else (
"do not launch a broader nonlinear optimizer campaign; fix the reduced gate, "
"cross-sample dispersion, or replicated landscape uncertainty first"
)
),
}
[docs]
def candidate_transport_metric(
candidate: Mapping[str, Any],
*,
metric_keys: Sequence[str] = DEFAULT_TRANSPORT_METRIC_KEYS,
) -> dict[str, Any]:
"""Return the first finite transport metric found in a candidate summary."""
for key in tuple(str(item) for item in metric_keys):
value = _finite_float_or_none(candidate.get(key))
if value is not None:
return {
"available": True,
"value": value,
"source": key,
"uses_total_objective_proxy": key == "objective_final",
}
return {
"available": False,
"value": None,
"source": None,
"uses_total_objective_proxy": False,
}
def _finite_sequence(values: Any) -> tuple[float, ...]:
if values is None:
return ()
if isinstance(values, np.ndarray):
raw_values: Sequence[Any] = values.ravel().tolist()
elif isinstance(values, Sequence) and not isinstance(values, (str, bytes)):
raw_values = values
else:
raw_values = (values,)
out: list[float] = []
for value in raw_values:
finite = _finite_float_or_none(value)
if finite is not None:
out.append(finite)
return tuple(out)
def _sample_values(sample_set: Any, *names: str) -> tuple[float, ...]:
if sample_set is None:
return ()
for name in names:
if isinstance(sample_set, Mapping) and name in sample_set:
values = _finite_sequence(sample_set.get(name))
else:
values = _finite_sequence(getattr(sample_set, name, None))
if values:
return values
return ()
def _ky_values_single_grid_compatible(values: Sequence[float]) -> bool:
"""Return whether ``ky`` values can share the current single-``Ly`` grid."""
if not values:
return False
arr = np.asarray(values, dtype=float)
if arr.ndim != 1 or arr.size < 1 or not np.all(np.isfinite(arr)) or np.any(arr <= 0.0):
return False
base = float(np.min(arr))
ratios = arr / base
return bool(np.allclose(ratios, np.rint(ratios), rtol=5.0e-10, atol=5.0e-12))
[docs]
def transport_objective_sample_summary(
sample_set: Any,
*,
policy: VMECJAXNonlinearAuditPolicy | None = None,
) -> dict[str, Any]:
"""Summarize whether a transport objective has enough sample coverage.
The nonlinear audit that motivated this gate was a single reduced metric:
it improved locally but did not transfer to the replicated late-window
heat-flux mean. Multi-surface, multi-field-line, and multi-``k_y`` coverage
is therefore treated as an admission requirement for the next candidate.
"""
policy = policy or VMECJAXNonlinearAuditPolicy()
surfaces = _sample_values(sample_set, "surfaces", "torflux_values", "rho_values")
alphas = _sample_values(sample_set, "alphas", "alpha_values", "field_line_labels")
ky_values = _sample_values(sample_set, "ky_values", "kys", "ky")
surface_count = len(set(surfaces))
alpha_count = len(set(alphas))
ky_count = len(set(ky_values))
sample_count = surface_count * alpha_count * ky_count
blockers: list[str] = []
if sample_set is None:
blockers.append("missing_objective_sample_set")
if surface_count < int(policy.minimum_surface_count):
blockers.append("insufficient_surface_coverage")
if alpha_count < int(policy.minimum_alpha_count):
blockers.append("insufficient_field_line_coverage")
if ky_count < int(policy.minimum_ky_count):
blockers.append("insufficient_ky_coverage")
if ky_count and not _ky_values_single_grid_compatible(ky_values):
blockers.append("ky_values_not_single_grid_compatible")
if sample_count < int(policy.minimum_sample_count):
blockers.append("insufficient_total_sample_count")
return {
"surfaces": [float(item) for item in surfaces],
"alphas": [float(item) for item in alphas],
"ky_values": [float(item) for item in ky_values],
"surface_count": surface_count,
"alpha_count": alpha_count,
"ky_count": ky_count,
"sample_count": sample_count,
"passed": not blockers,
"blockers": blockers,
}
[docs]
def build_nonlinear_audit_redesign_report(
matched_comparison: Mapping[str, Any],
*,
objective_sample_set: Any = None,
policy: VMECJAXNonlinearAuditPolicy | None = None,
) -> dict[str, Any]:
"""Decide whether a matched nonlinear audit promotes or redesigns a candidate.
This is the fail-closed bridge between reduced VMEC-JAX transport admission
and expensive long-window nonlinear evidence. A candidate is promoted only
if the matched replicated nonlinear comparison passes, has a positive
uncertainty-separated reduction, and the reduced objective used enough
surface/field-line/``k_y`` samples to avoid a single-point overfit.
"""
policy = policy or VMECJAXNonlinearAuditPolicy()
stats = matched_comparison.get("statistics", {})
if not isinstance(stats, Mapping):
stats = {}
relative_reduction = _finite_float_or_none(stats.get("relative_reduction"))
z_score = _finite_float_or_none(stats.get("uncertainty_z_score"))
baseline = matched_comparison.get("baseline", {})
candidate = matched_comparison.get("candidate", {})
baseline_passed = bool(baseline.get("passed", False)) if isinstance(baseline, Mapping) else False
candidate_passed = bool(candidate.get("passed", False)) if isinstance(candidate, Mapping) else False
comparison_passed = bool(matched_comparison.get("passed", False))
nonlinear_blockers: list[str] = []
if not baseline_passed:
nonlinear_blockers.append("baseline_ensemble_failed")
if not candidate_passed:
nonlinear_blockers.append("candidate_ensemble_failed")
if relative_reduction is None:
nonlinear_blockers.append("missing_relative_reduction")
elif relative_reduction < float(policy.minimum_relative_reduction):
nonlinear_blockers.append("insufficient_matched_reduction")
if z_score is None:
nonlinear_blockers.append("missing_uncertainty_z_score")
elif z_score < float(policy.minimum_uncertainty_z_score):
nonlinear_blockers.append("insufficient_uncertainty_separation")
if not comparison_passed:
nonlinear_blockers.append("matched_comparison_not_passed")
sample_summary = transport_objective_sample_summary(objective_sample_set, policy=policy)
all_blockers = nonlinear_blockers + list(sample_summary["blockers"])
promoted = not all_blockers
recommended = {
"surfaces": [float(item) for item in policy.recommended_surfaces],
"alphas": [float(item) for item in policy.recommended_alphas],
"ky_values": [float(item) for item in policy.recommended_ky_values],
"sample_count": (
len(policy.recommended_surfaces)
* len(policy.recommended_alphas)
* len(policy.recommended_ky_values)
),
}
return {
"kind": "vmec_jax_nonlinear_transport_audit_redesign_report",
"policy": policy.to_dict(),
"matched_comparison_case": matched_comparison.get("case"),
"matched_comparison_passed": comparison_passed,
"nonlinear_audit_promoted": promoted,
"requires_objective_redesign": not promoted,
"nonlinear_audit_blockers": nonlinear_blockers,
"objective_sample_summary": sample_summary,
"blockers": all_blockers,
"recommended_sample_set": recommended,
"gates": [
{
"metric": "matched_replicated_late_window_reduction",
"passed": "insufficient_matched_reduction" not in nonlinear_blockers
and "missing_relative_reduction" not in nonlinear_blockers,
"value": relative_reduction,
"threshold": float(policy.minimum_relative_reduction),
},
{
"metric": "uncertainty_separated_reduction",
"passed": "insufficient_uncertainty_separation" not in nonlinear_blockers
and "missing_uncertainty_z_score" not in nonlinear_blockers,
"value": z_score,
"threshold": float(policy.minimum_uncertainty_z_score),
},
{
"metric": "multi_sample_objective_coverage",
"passed": bool(sample_summary["passed"]),
"value": int(sample_summary["sample_count"]),
"threshold": int(policy.minimum_sample_count),
},
],
"next_action": (
"candidate may be used as nonlinear turbulent-flux optimization evidence"
if promoted
else (
"redesign the reduced transport objective with the recommended multi-surface, "
"multi-field-line, multi-ky sample set; rerun projected admission; then repeat "
"the matched long-window nonlinear audit"
)
),
}
def _physical_gate_blockers(candidate: Mapping[str, Any], policy: VMECJAXTransportAdmissionPolicy) -> list[str]:
blockers: list[str] = []
gate_reported_passed = bool(candidate.get("gate_reported_passed", candidate.get("passed", False)))
gate_authoritative = bool(candidate.get("gate_is_authoritative", True))
if bool(policy.require_authoritative_gate) and not gate_authoritative:
blockers.append("non_authoritative_gate")
if not gate_reported_passed:
checks = candidate.get("gate_checks", {})
if isinstance(checks, Mapping):
failed = [
f"gate_{name}"
for name, passed in checks.items()
if passed is not None and not bool(passed)
]
blockers.extend(failed or ["gate_failed"])
else:
blockers.append("gate_failed")
if not bool(candidate.get("passed", False)):
if "gate_failed" not in blockers and not any(item.startswith("gate_") for item in blockers):
blockers.append("candidate_not_passed")
return blockers
def _relative_improvement(
baseline_value: float,
candidate_value: float,
*,
lower_is_better: bool,
) -> float:
signed = baseline_value - candidate_value if lower_is_better else candidate_value - baseline_value
scale = max(abs(baseline_value), 1.0e-300)
return float(signed / scale)
[docs]
def build_transport_admission_report(
summaries: Sequence[Mapping[str, Any]],
*,
policy: VMECJAXTransportAdmissionPolicy | None = None,
) -> dict[str, Any]:
"""Annotate and select VMEC-JAX transport candidates.
A transport candidate is admitted only when it passes the physical solved-WOUT
gate and improves the selected transport metric relative to the admitted
baseline. The baseline may be promoted only as a fallback audit target; it
never counts as a transport-optimization success.
"""
policy = policy or VMECJAXTransportAdmissionPolicy()
annotated: list[dict[str, Any]] = []
baseline: dict[str, Any] | None = None
for raw in summaries:
item = dict(raw)
metric = candidate_transport_metric(item, metric_keys=policy.metric_keys)
physical_blockers = _physical_gate_blockers(item, policy)
item["transport_metric"] = metric
item["physical_gate_blockers"] = physical_blockers
item["admission_blockers"] = list(physical_blockers)
item["relative_transport_improvement"] = None
item["admitted_for_transport_optimization"] = False
item["admitted_for_long_window_nonlinear_audit"] = False
annotated.append(item)
if baseline is None and bool(item.get("baseline")):
baseline = item
if baseline is None:
baseline = next((item for item in annotated if item.get("transport_weight") is None), None)
baseline_metric = cast(dict[str, Any], baseline.get("transport_metric")) if baseline else None
baseline_metric_value = (
_finite_float_or_none(baseline_metric.get("value"))
if isinstance(baseline_metric, Mapping)
else None
)
baseline_physical_ok = baseline is not None and not baseline.get("physical_gate_blockers")
if baseline is not None:
baseline["admitted_for_long_window_nonlinear_audit"] = bool(baseline_physical_ok)
admitted_transport: list[dict[str, Any]] = []
for item in annotated:
if bool(item.get("baseline")):
continue
if item.get("transport_weight") is None:
item["admission_blockers"].append("missing_transport_weight")
metric = cast(dict[str, Any], item["transport_metric"])
candidate_metric_value = _finite_float_or_none(metric.get("value"))
if candidate_metric_value is None:
item["admission_blockers"].append("missing_transport_metric")
if baseline_metric_value is None:
item["admission_blockers"].append("missing_baseline_transport_metric")
if candidate_metric_value is not None and baseline_metric_value is not None:
improvement = _relative_improvement(
baseline_metric_value,
candidate_metric_value,
lower_is_better=bool(policy.lower_is_better),
)
item["relative_transport_improvement"] = improvement
if improvement < float(policy.minimum_relative_improvement):
item["admission_blockers"].append("insufficient_transport_improvement")
item["admitted_for_transport_optimization"] = not item["admission_blockers"]
item["admitted_for_long_window_nonlinear_audit"] = bool(item["admitted_for_transport_optimization"])
if bool(item["admitted_for_transport_optimization"]):
admitted_transport.append(item)
promoted: dict[str, Any] | None
if admitted_transport:
promoted = max(
admitted_transport,
key=lambda item: (
float(item.get("transport_weight") or 0.0),
float(item.get("relative_transport_improvement") or 0.0),
),
)
elif bool(policy.allow_baseline_fallback) and baseline is not None and bool(baseline_physical_ok):
promoted = baseline
else:
promoted = None
return {
"kind": "vmec_jax_transport_admission_report",
"policy": policy.to_dict(),
"baseline_label": None if baseline is None else baseline.get("label"),
"baseline_transport_metric": baseline_metric,
"candidates": annotated,
"admitted_transport_candidates": [
item.get("label") for item in admitted_transport if item.get("label") is not None
],
"transport_candidate_admitted": bool(admitted_transport),
"promoted_candidate": promoted,
"passed": promoted is not None,
"next_action": (
"launch matched long-window nonlinear audits for the admitted transport candidate"
if admitted_transport
else (
"no transport candidate both preserved physical gates and improved the transport metric; "
"keep the QA-only baseline and use a constraint-preserving projection/admission method"
)
),
}
[docs]
def select_admitted_transport_candidate(
summaries: Sequence[Mapping[str, Any]],
*,
policy: VMECJAXTransportAdmissionPolicy | None = None,
) -> dict[str, Any] | None:
"""Return the promoted candidate from :func:`build_transport_admission_report`."""
report = build_transport_admission_report(summaries, policy=policy)
promoted = report.get("promoted_candidate")
return dict(promoted) if isinstance(promoted, Mapping) else None
__all__ = [
"DEFAULT_TRANSPORT_METRIC_KEYS",
"VMECJAXNonlinearAuditPolicy",
"VMECJAXNonlinearCampaignPolicy",
"VMECJAXReducedPrelaunchPolicy",
"VMECJAXTransportAdmissionPolicy",
"build_nonlinear_landscape_admission_report",
"build_nonlinear_campaign_admission_report",
"build_nonlinear_audit_redesign_report",
"build_reduced_nonlinear_audit_prelaunch_report",
"build_transport_admission_report",
"candidate_transport_metric",
"select_admitted_transport_candidate",
"transport_objective_sample_summary",
]