Source code for spectraxgk.nonlinear_gradient_followup

"""Targeted follow-up plans for nonlinear turbulence-gradient audits.

This module turns failed long-window central finite-difference artifacts into a
bounded run prescription.  It is deliberately conservative: extra replicas are
recommended only when the finite-difference response is resolved and local, but
the propagated gradient uncertainty is slightly too large.
"""

from __future__ import annotations

from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Mapping, Sequence
import math
import re


STATE_TO_RUN_STATE = {
    "baseline": "baseline",
    "plus": "plus_delta",
    "minus": "minus_delta",
}


[docs] @dataclass(frozen=True) class NonlinearGradientFollowupConfig: """Acceptance and cost controls for the follow-up planner.""" max_gradient_uncertainty_rel: float = 0.50 max_fd_asymmetry_rel: float = 0.50 min_fd_response_fraction: float = 0.03 sem_safety_factor: float = 1.10 max_extra_replicates_per_state: int = 4 default_nominal_timestep: float = 0.05 value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientCandidateDesignConfig: """Conditioning limits for the next nonlinear-gradient campaign design.""" max_gradient_uncertainty_rel: float = 0.50 max_fd_asymmetry_rel: float = 0.50 max_window_mean_rel_spread: float = 0.15 max_window_sem_rel: float = 0.25 min_fd_response_fraction: float = 0.03 sem_safety_factor: float = 1.10 max_extra_replicates_per_state: int = 4 max_checked_bracket_scale: float = 1.50 locality_safety_factor: float = 0.95 value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientVarianceReductionConfig: """Controls for paired-seed/control-variate nonlinear-gradient planning.""" max_paired_response_uncertainty_rel: float = 0.50 max_control_variate_uncertainty_rel: float = 0.50 min_control_variate_sem_reduction: float = 0.25 require_known_control_mean: bool = True sem_safety_factor: float = 1.10 min_common_pairs: int = 2 max_extra_paired_seeds: int = 4 value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientControlVariateCampaignConfig: """Controls for an independent control-mean campaign.""" target_response_uncertainty_rel: float = 0.50 sem_safety_factor: float = 1.10 min_control_mean_pairs: int = 4 max_control_mean_pairs: int = 32 first_new_seed: int = 34 value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientControlMeanGateConfig: """Acceptance limits for an independent control-mean estimate.""" target_response_uncertainty_rel: float = 0.50 min_control_mean_pairs: int = 4 require_state_ensembles_passed: bool = True value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientCompositeControlConfig: """Controls for constructing the next composite nonlinear-gradient direction.""" max_gradient_uncertainty_rel: float = 1.00 max_fd_asymmetry_rel: float = 0.50 min_fd_response_fraction: float = 0.03 min_same_sign_fraction: float = 0.80 min_controls: int = 2 default_relative_delta: float = 0.02 max_weight_abs: float = 1.0 value_floor: float = 1.0e-12
[docs] @dataclass(frozen=True) class NonlinearGradientQLSeedScreenConfig: """Admission limits for quasilinear-seeded nonlinear-gradient controls.""" target_objectives: tuple[str, ...] = ( "mixing_length_heat_flux_proxy", "linear_heat_flux_weight", "gamma", ) primary_objective: str = "mixing_length_heat_flux_proxy" min_distinct_controls: int = 2 min_cases_per_control: int = 2 min_sign_consistency: float = 0.75 max_objective_rel_error: float = 0.02 min_abs_sensitivity: float = 1.0e-12 require_artifact_passed: bool = False
[docs] @dataclass(frozen=True) class NonlinearGradientStateControlRunbookConfig: """Admission limits for mapping VMEC-state controls to launchable inputs.""" min_mapped_controls: int = 2 max_mapping_condition_number: float = 1.0e6 max_mapping_relative_residual: float = 0.10 default_relative_delta: float = 0.02 require_mapping_passed: bool = True
def _finite_float(value: Any) -> float | None: try: out = float(value) except (TypeError, ValueError): return None return out if math.isfinite(out) else None def _finite_int(value: Any) -> int | None: try: out = int(value) except (TypeError, ValueError): return None return out if out >= 0 else None def _json_number(value: Any) -> float | int | None: number = _finite_float(value) if number is None: return None if isinstance(value, int): return value return float(number) def _metric(payload: Mapping[str, Any], *names: str) -> float | None: metrics = payload.get("metrics") if not isinstance(metrics, Mapping): metrics = {} conditioning = payload.get("conditioning") if isinstance(conditioning, Mapping): sources: tuple[Mapping[str, Any], ...] = (metrics, conditioning, payload) else: sources = (metrics, payload) for source in sources: for name in names: value = _finite_float(source.get(name)) if value is not None: return value return None def _nested_metric(payload: Mapping[str, Any], source_name: str, *names: str) -> float | None: source = payload.get(source_name) if not isinstance(source, Mapping): return None for name in names: value = _finite_float(source.get(name)) if value is not None: return value return None def _artifact_passed(payload: Mapping[str, Any]) -> bool: if bool(payload.get("passed", False)): return True nested = payload.get("nonlinear_turbulence_gradient_gate") return isinstance(nested, Mapping) and bool(nested.get("passed", False)) def _coefficient_label_from_parameter(parameter: Any) -> str | None: if not isinstance(parameter, str): return None match = re.fullmatch(r"\s*(rbc|rbs|zbc|zbs)_([+-]?\d+)_([+-]?\d+)\s*", parameter, re.IGNORECASE) if match is None: return None family, m_value, n_value = match.groups() return f"{family.upper()}({int(m_value)},{int(n_value)})" def _state_control_family(parameter_indices: Any) -> str | None: if not isinstance(parameter_indices, Mapping) or not parameter_indices: return None for family in ("Rcos", "Rsin", "Zcos", "Zsin", "RBC", "RBS", "ZBC", "ZBS"): if family in parameter_indices: return family return str(next(iter(parameter_indices))) def _label_from_row(row: Mapping[str, Any]) -> str | None: variant = row.get("variant") if isinstance(variant, Mapping): seed = variant.get("seed") if seed is not None: try: return f"seed{int(seed)}" except (TypeError, ValueError): pass for key in ("variant_label", "source_artifact", "summary_artifact", "path"): value = row.get(key) if not isinstance(value, str): continue label_source = Path(value).name if key != "variant_label" else value matches = re.findall(r"(seed[0-9]+|dt[0-9]+(?:p[0-9]+)?)", label_source) if matches: return matches[-1] return None def _seed_numbers(source_ensembles: Mapping[str, Any]) -> list[int]: seeds: list[int] = [] for raw in source_ensembles.values(): if not isinstance(raw, Mapping): continue rows = raw.get("rows") if not isinstance(rows, Sequence): continue for row in rows: if not isinstance(row, Mapping): continue label = _label_from_row(row) if label is None: continue match = re.fullmatch(r"seed([0-9]+)", label) if match: seeds.append(int(match.group(1))) return seeds def _replicate_count(source_ensembles: Mapping[str, Any]) -> int | None: counts: list[int] = [] for state in ("minus", "baseline", "plus"): raw = source_ensembles.get(state) if not isinstance(raw, Mapping): continue count = _finite_int(raw.get("n_reports")) if count is None: rows = raw.get("rows") count = len(rows) if isinstance(rows, Sequence) else None if count is not None: counts.append(count) return min(counts) if counts else None def _ensemble_stats_value(raw: Mapping[str, Any], name: str) -> float | None: statistics = raw.get("statistics") if isinstance(statistics, Mapping): value = _finite_float(statistics.get(name)) if value is not None: return value return _finite_float(raw.get(name)) def _ensemble_state_variance_report( source_ensembles: Mapping[str, Any], *, config: NonlinearGradientCandidateDesignConfig, ) -> dict[str, Any]: """Summarize which state limits a finite-difference variance-reduction plan.""" rows: list[dict[str, Any]] = [] for state in ("baseline", "plus", "minus"): raw = source_ensembles.get(state) if not isinstance(raw, Mapping): continue statistics = raw.get("statistics") n_reports = _finite_int(raw.get("n_reports")) if n_reports is None and isinstance(statistics, Mapping): n_reports = _finite_int(statistics.get("n_reports")) mean_rel_spread = _ensemble_stats_value(raw, "mean_rel_spread") combined_sem_rel = _ensemble_stats_value(raw, "combined_sem_rel") rows.append( { "state": state, "passed": bool(raw.get("passed", False)), "n_reports": n_reports, "mean_rel_spread": _json_number(mean_rel_spread), "combined_sem_rel": _json_number(combined_sem_rel), "spread_gate_passed": ( None if mean_rel_spread is None else bool(mean_rel_spread <= config.max_window_mean_rel_spread) ), "sem_gate_passed": ( None if combined_sem_rel is None else bool(combined_sem_rel <= config.max_window_sem_rel) ), } ) finite_spreads = [ (str(row["state"]), float(row["mean_rel_spread"])) for row in rows if row.get("mean_rel_spread") is not None ] limiting_state = None max_mean_rel_spread = None if finite_spreads: limiting_state, max_mean_rel_spread = max(finite_spreads, key=lambda item: item[1]) failed_spread_states = [ str(row["state"]) for row in rows if row.get("spread_gate_passed") is False ] failed_sem_states = [ str(row["state"]) for row in rows if row.get("sem_gate_passed") is False ] recommendation = "no replicated-window variance limiter identified" if failed_spread_states: recommendation = ( "target paired-seed or control-variate variance reduction for " f"{', '.join(failed_spread_states)} before adding blind replicas" ) elif failed_sem_states: recommendation = ( "target additional matched replicas for " f"{', '.join(failed_sem_states)} before changing the bracket" ) elif rows: recommendation = "state ensembles pass spread/SEM gates; focus on finite-difference conditioning" return { "state_rows": rows, "limiting_state": limiting_state, "max_mean_rel_spread": _json_number(max_mean_rel_spread), "failed_spread_states": failed_spread_states, "failed_sem_states": failed_sem_states, "recommendation": recommendation, } def _state_means_by_label(source_ensembles: Mapping[str, Any], state: str) -> dict[str, float]: raw = source_ensembles.get(state) if not isinstance(raw, Mapping): return {} rows = raw.get("rows") if not isinstance(rows, Sequence): return {} out: dict[str, float] = {} for row in rows: if not isinstance(row, Mapping): continue label = _label_from_row(row) value = _finite_float(row.get("late_mean")) if label is not None and value is not None: out[label] = value return out def _mean_and_sem(values: Sequence[float]) -> tuple[float | None, float | None]: finite = [float(value) for value in values if math.isfinite(float(value))] if not finite: return None, None mean = sum(finite) / len(finite) if len(finite) < 2: return mean, None variance = sum((value - mean) ** 2 for value in finite) / (len(finite) - 1) return mean, math.sqrt(variance / len(finite)) def _sample_covariance(x_values: Sequence[float], y_values: Sequence[float]) -> float | None: if len(x_values) != len(y_values) or len(x_values) < 2: return None x_mean = sum(x_values) / len(x_values) y_mean = sum(y_values) / len(y_values) return sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values)) / (len(x_values) - 1) def _control_variate_candidate( *, name: str, response_samples: Sequence[float], control_samples: Sequence[float], response_mean: float | None, raw_sem: float | None, config: NonlinearGradientVarianceReductionConfig, ) -> dict[str, Any]: if len(response_samples) != len(control_samples) or len(response_samples) < 3: return { "name": name, "admissible": False, "blockers": ["insufficient_common_samples"], } control_mean, control_sem = _mean_and_sem(control_samples) response_variance = _sample_covariance(response_samples, response_samples) control_variance = _sample_covariance(control_samples, control_samples) covariance = _sample_covariance(response_samples, control_samples) blockers: list[str] = [] if ( response_mean is None or raw_sem is None or response_variance is None or control_variance is None or covariance is None or control_mean is None or control_variance <= config.value_floor ): blockers.append("degenerate_control_or_response") return { "name": name, "admissible": False, "blockers": blockers, } beta = covariance / control_variance adjusted_samples = [ response - beta * (control - control_mean) for response, control in zip(response_samples, control_samples) ] adjusted_mean, adjusted_sem = _mean_and_sem(adjusted_samples) sample_count = len(response_samples) adjusted_uncertainty_rel = None sem_reduction = None if adjusted_sem is not None: adjusted_uncertainty_rel = abs(adjusted_sem) / max(abs(response_mean), config.value_floor) sem_reduction = 1.0 - adjusted_sem / max(raw_sem, config.value_floor) correlation = covariance / math.sqrt(max(response_variance * control_variance, config.value_floor)) if adjusted_uncertainty_rel is None or adjusted_uncertainty_rel > config.max_control_variate_uncertainty_rel: blockers.append("control_variate_uncertainty_above_gate") if sem_reduction is None or sem_reduction < config.min_control_variate_sem_reduction: blockers.append("control_variate_sem_reduction_too_small") if config.require_known_control_mean: blockers.append("control_mean_not_independently_known") return { "name": name, "admissible": not blockers, "blockers": blockers, "n_samples": len(response_samples), "beta": _json_number(beta), "correlation": _json_number(correlation), "control_mean_sample": _json_number(control_mean), "control_sample_sem": _json_number(control_sem), "control_sample_std": _json_number(None if control_sem is None else control_sem * math.sqrt(sample_count)), "adjusted_response_mean": _json_number(adjusted_mean), "adjusted_response_sem": _json_number(adjusted_sem), "adjusted_response_sample_std": _json_number( None if adjusted_sem is None else adjusted_sem * math.sqrt(sample_count) ), "adjusted_response_uncertainty_rel": _json_number(adjusted_uncertainty_rel), "sem_reduction_fraction": _json_number(sem_reduction), "requires_independent_control_mean": bool(config.require_known_control_mean), }
[docs] def nonlinear_gradient_variance_reduction_plan( artifact: Mapping[str, Any], *, path: str | None = None, label: str | None = None, case: str = "nonlinear_turbulence_gradient_variance_reduction_plan", config: NonlinearGradientVarianceReductionConfig | None = None, ) -> dict[str, Any]: """Plan paired-seed/control-variate follow-up for a failed central-FD artifact. The plan uses common seed/timestep labels across ``plus`` and ``minus`` ensembles to estimate the uncertainty of paired finite-difference responses. It is a campaign-design artifact, not nonlinear-gradient evidence. """ cfg = config or NonlinearGradientVarianceReductionConfig() if cfg.max_paired_response_uncertainty_rel <= 0.0: raise ValueError("max_paired_response_uncertainty_rel must be positive") if cfg.max_control_variate_uncertainty_rel <= 0.0: raise ValueError("max_control_variate_uncertainty_rel must be positive") if cfg.min_control_variate_sem_reduction < 0.0: raise ValueError("min_control_variate_sem_reduction must be non-negative") if cfg.sem_safety_factor <= 0.0: raise ValueError("sem_safety_factor must be positive") if cfg.min_common_pairs < 1: raise ValueError("min_common_pairs must be positive") if cfg.max_extra_paired_seeds < 0: raise ValueError("max_extra_paired_seeds must be non-negative") source_ensembles_raw = artifact.get("source_ensembles") source_ensembles = source_ensembles_raw if isinstance(source_ensembles_raw, Mapping) else {} variance = _ensemble_state_variance_report( source_ensembles, config=NonlinearGradientCandidateDesignConfig( max_window_mean_rel_spread=0.15, max_window_sem_rel=0.25, ), ) plus = _state_means_by_label(source_ensembles, "plus") minus = _state_means_by_label(source_ensembles, "minus") baseline = _state_means_by_label(source_ensembles, "baseline") common_labels = sorted(set(plus).intersection(minus)) common_with_baseline = sorted(set(common_labels).intersection(baseline)) pair_rows: list[dict[str, Any]] = [] paired_differences: list[float] = [] for item in common_labels: diff = plus[item] - minus[item] paired_differences.append(diff) row: dict[str, Any] = { "label": item, "plus_mean": _json_number(plus[item]), "minus_mean": _json_number(minus[item]), "plus_minus_difference": _json_number(diff), } if item in baseline: row["baseline_mean"] = _json_number(baseline[item]) row["plus_baseline_difference"] = _json_number(plus[item] - baseline[item]) row["baseline_minus_difference"] = _json_number(baseline[item] - minus[item]) pair_rows.append(row) paired_mean, paired_sem = _mean_and_sem(paired_differences) paired_uncertainty_rel = None if paired_mean is not None and paired_sem is not None: paired_uncertainty_rel = abs(paired_sem) / max(abs(paired_mean), cfg.value_floor) control_candidates: list[dict[str, Any]] = [] if common_with_baseline: response_for_baseline = [plus[item] - minus[item] for item in common_with_baseline] baseline_control = [baseline[item] for item in common_with_baseline] midpoint_control = [0.5 * (plus[item] + minus[item]) for item in common_with_baseline] control_candidates.append( _control_variate_candidate( name="baseline_transport_common_mode", response_samples=response_for_baseline, control_samples=baseline_control, response_mean=paired_mean, raw_sem=paired_sem, config=cfg, ) ) control_candidates.append( _control_variate_candidate( name="plus_minus_midpoint_common_mode", response_samples=response_for_baseline, control_samples=midpoint_control, response_mean=paired_mean, raw_sem=paired_sem, config=cfg, ) ) apparent_candidates = [ row for row in control_candidates if "control_variate_uncertainty_above_gate" not in row.get("blockers", []) and "control_variate_sem_reduction_too_small" not in row.get("blockers", []) ] best_control_variate = None if control_candidates: def _candidate_sort_key(row: Mapping[str, Any]) -> tuple[float, float]: uncertainty = _finite_float(row.get("adjusted_response_uncertainty_rel")) reduction = _finite_float(row.get("sem_reduction_fraction")) return ( uncertainty if uncertainty is not None else float("inf"), -(reduction if reduction is not None else float("-inf")), ) best_control_variate = min( control_candidates, key=_candidate_sort_key, ) required_pairs = None extra_pairs = None if paired_uncertainty_rel is not None: scale = (paired_uncertainty_rel / cfg.max_paired_response_uncertainty_rel) ** 2 scale *= cfg.sem_safety_factor required_pairs = max(len(common_labels), int(math.ceil(len(common_labels) * scale))) extra_pairs = max(0, required_pairs - len(common_labels)) if len(common_labels) < cfg.min_common_pairs: action = "recover_or_add_matched_seed_pairs" recommendation = "common plus/minus seed labels are insufficient for paired finite differences" elif paired_uncertainty_rel is None: action = "add_matched_seed_pairs" recommendation = "paired response SEM cannot be estimated from fewer than two finite pairs" elif paired_uncertainty_rel <= cfg.max_paired_response_uncertainty_rel: action = "use_paired_seed_response_estimator" recommendation = "paired seed response uncertainty is within the target gate" elif apparent_candidates and cfg.require_known_control_mean: action = "estimate_control_mean_or_redesign_observable" recommendation = ( "a common-mode control variate reduces residual scatter, but its expectation is not " "independently known; estimate the control mean or redesign the observable before " "using it as a production uncertainty reducer" ) elif any(row.get("admissible") for row in control_candidates): action = "use_control_variate_response_estimator" recommendation = "control-variate response uncertainty is within the target gate" elif extra_pairs is not None and extra_pairs <= cfg.max_extra_paired_seeds: action = "add_matched_paired_seed_replicates" recommendation = "add bounded matched plus/minus seed pairs before changing the observable" else: action = "design_control_variate_or_new_observable" recommendation = ( "paired seed differences reduce common noise but are still too uncertain; " "design a control-variate observable or better-conditioned response before more GPU time" ) return { "kind": "nonlinear_turbulence_gradient_variance_reduction_plan", "claim_level": "campaign_design_not_gradient_evidence", "case": case, "path": path, "label": str(label or artifact.get("parameter_name") or path or case), "passed": action in { "use_paired_seed_response_estimator", "use_control_variate_response_estimator", }, "action": action, "recommendation": recommendation, "config": asdict(cfg), "variance_reduction": variance, "summary": { "common_pair_count": len(common_labels), "common_with_baseline_count": len(common_with_baseline), "paired_response_mean": _json_number(paired_mean), "paired_response_sem": _json_number(paired_sem), "paired_response_uncertainty_rel": _json_number(paired_uncertainty_rel), "required_pair_count": required_pairs, "extra_pair_count": extra_pairs, "best_control_variate": ( None if best_control_variate is None else str(best_control_variate.get("name")) ), }, "control_variate_candidates": control_candidates, "pair_rows": pair_rows, }
[docs] def nonlinear_gradient_control_variate_campaign_plan( variance_report: Mapping[str, Any], *, case: str = "nonlinear_turbulence_gradient_control_variate_campaign", candidate_name: str | None = None, config: NonlinearGradientControlVariateCampaignConfig | None = None, ) -> dict[str, Any]: """Design an independent control-mean campaign from a variance runbook. The input is the output of :func:`nonlinear_gradient_variance_reduction_plan`. The planner is intentionally fail-closed: a sample-centered control variate can motivate new runs, but the campaign is only considered launch-ready when an independent control-mean estimate can bring the combined uncertainty under the target gate within the configured run budget. """ cfg = config or NonlinearGradientControlVariateCampaignConfig() if cfg.target_response_uncertainty_rel <= 0.0: raise ValueError("target_response_uncertainty_rel must be positive") if cfg.sem_safety_factor <= 0.0: raise ValueError("sem_safety_factor must be positive") if cfg.min_control_mean_pairs < 1: raise ValueError("min_control_mean_pairs must be positive") if cfg.max_control_mean_pairs < cfg.min_control_mean_pairs: raise ValueError("max_control_mean_pairs must be at least min_control_mean_pairs") if cfg.first_new_seed < 0: raise ValueError("first_new_seed must be non-negative") summary_raw = variance_report.get("summary") summary = summary_raw if isinstance(summary_raw, Mapping) else {} response_mean = _finite_float(summary.get("paired_response_mean")) raw_response_uncertainty_rel = _finite_float(summary.get("paired_response_uncertainty_rel")) requested_candidate = candidate_name or str(summary.get("best_control_variate") or "") candidates_raw = variance_report.get("control_variate_candidates") candidates = [row for row in candidates_raw if isinstance(row, Mapping)] if isinstance(candidates_raw, Sequence) else [] if requested_candidate: candidate = next((row for row in candidates if str(row.get("name")) == requested_candidate), None) else: candidate = None if candidate is None and candidates: def _sort_key(row: Mapping[str, Any]) -> tuple[float, float]: uncertainty = _finite_float(row.get("adjusted_response_uncertainty_rel")) reduction = _finite_float(row.get("sem_reduction_fraction")) return ( uncertainty if uncertainty is not None else float("inf"), -(reduction if reduction is not None else float("-inf")), ) candidate = min(candidates, key=_sort_key) blockers: list[str] = [] if candidate is None: blockers.append("no_control_variate_candidate") candidate_name_out = None if candidate is None else str(candidate.get("name")) beta = _finite_float(None if candidate is None else candidate.get("beta")) residual_sem = _finite_float(None if candidate is None else candidate.get("adjusted_response_sem")) residual_uncertainty_rel = _finite_float( None if candidate is None else candidate.get("adjusted_response_uncertainty_rel") ) control_sample_std = _finite_float(None if candidate is None else candidate.get("control_sample_std")) control_sample_sem = _finite_float(None if candidate is None else candidate.get("control_sample_sem")) current_pair_count = _finite_int(summary.get("common_with_baseline_count") or summary.get("common_pair_count")) candidate_pair_count = _finite_int(None if candidate is None else candidate.get("n_samples")) current_pair_count = candidate_pair_count or current_pair_count candidate_blockers = list(candidate.get("blockers", [])) if isinstance(candidate, Mapping) else [] if response_mean is None or abs(response_mean) <= cfg.value_floor: blockers.append("degenerate_response_mean") if beta is None: blockers.append("missing_control_variate_beta") if residual_sem is None: blockers.append("missing_residual_sem") if control_sample_std is None or control_sample_std <= cfg.value_floor: blockers.append("missing_or_degenerate_control_sample_std") if current_pair_count is None or current_pair_count < 2: blockers.append("insufficient_existing_pairs") hard_candidate_blockers = [ str(item) for item in candidate_blockers if str(item) != "control_mean_not_independently_known" ] if hard_candidate_blockers: blockers.extend(f"candidate_{item}" for item in hard_candidate_blockers) target_abs_sem = None independent_control_pairs = None predicted_control_mean_sem = None predicted_control_contribution_sem = None predicted_combined_sem = None predicted_combined_uncertainty_rel = None residual_margin_sem = None if not blockers: assert response_mean is not None assert beta is not None assert residual_sem is not None assert control_sample_std is not None target_abs_sem = cfg.target_response_uncertainty_rel * max(abs(response_mean), cfg.value_floor) residual_margin_squared = target_abs_sem * target_abs_sem - residual_sem * residual_sem if residual_margin_squared <= 0.0: blockers.append("residual_sem_already_exceeds_target") else: residual_margin_sem = math.sqrt(residual_margin_squared) required = math.ceil( cfg.sem_safety_factor * (abs(beta) * control_sample_std / max(residual_margin_sem, cfg.value_floor)) ** 2 ) independent_control_pairs = max(cfg.min_control_mean_pairs, int(required)) predicted_control_mean_sem = control_sample_std / math.sqrt(independent_control_pairs) predicted_control_contribution_sem = abs(beta) * predicted_control_mean_sem predicted_combined_sem = math.sqrt(residual_sem * residual_sem + predicted_control_contribution_sem**2) predicted_combined_uncertainty_rel = predicted_combined_sem / max(abs(response_mean), cfg.value_floor) if independent_control_pairs > cfg.max_control_mean_pairs: blockers.append("control_mean_pair_budget_exceeded") if predicted_combined_uncertainty_rel > cfg.target_response_uncertainty_rel: blockers.append("predicted_uncertainty_above_target") action = ( "launch_independent_control_mean_campaign" if not blockers else "redesign_observable_or_raise_control_mean_budget" ) planned_pairs = [] if independent_control_pairs is not None: for offset in range(independent_control_pairs): seed = cfg.first_new_seed + offset planned_pairs.append( { "pair_index": offset + 1, "variant_label": f"seed{seed}", "plus_state": "plus_delta", "minus_state": "minus_delta", "control_observable": "0.5 * (Q_plus + Q_minus)", "response_observable": "Q_plus - Q_minus", } ) return { "kind": "nonlinear_turbulence_gradient_control_variate_campaign_plan", "claim_level": "pre_run_campaign_design_not_gradient_evidence", "case": case, "passed": action == "launch_independent_control_mean_campaign", "action": action, "config": asdict(cfg), "source_variance_report_case": variance_report.get("case"), "candidate_name": candidate_name_out, "candidate_blockers": candidate_blockers, "blockers": blockers, "summary": { "raw_response_uncertainty_rel": _json_number(raw_response_uncertainty_rel), "residual_uncertainty_rel": _json_number(residual_uncertainty_rel), "predicted_combined_uncertainty_rel": _json_number(predicted_combined_uncertainty_rel), "paired_response_mean": _json_number(response_mean), "target_abs_sem": _json_number(target_abs_sem), "residual_sem": _json_number(residual_sem), "residual_margin_sem": _json_number(residual_margin_sem), "control_sample_sem": _json_number(control_sample_sem), "control_sample_std": _json_number(control_sample_std), "control_variate_beta": _json_number(beta), "current_common_pair_count": current_pair_count, "required_independent_control_mean_pairs": independent_control_pairs, "planned_new_run_count": None if independent_control_pairs is None else 2 * independent_control_pairs, "predicted_control_mean_sem": _json_number(predicted_control_mean_sem), "predicted_control_contribution_sem": _json_number(predicted_control_contribution_sem), "predicted_combined_sem": _json_number(predicted_combined_sem), }, "planned_pairs": planned_pairs, "postprocess_contract": { "control_mean_estimator": "mean over independent 0.5 * (Q_plus + Q_minus) matched pairs", "response_estimator": ( "apply the screened beta to matched response samples using the independently " "estimated control mean; include residual SEM and beta^2 Var(control_mean)" ), "promotion_rule": ( "do not promote until output gates, replicated-window gates, control-mean " "uncertainty, and combined response uncertainty all pass" ), }, }
[docs] def nonlinear_gradient_control_mean_gate( variance_report: Mapping[str, Any], *, plus_ensemble: Mapping[str, Any], minus_ensemble: Mapping[str, Any], plus_path: str | None = None, minus_path: str | None = None, case: str = "nonlinear_turbulence_gradient_control_mean_gate", candidate_name: str | None = None, config: NonlinearGradientControlMeanGateConfig | None = None, ) -> dict[str, Any]: """Evaluate an independent control-mean estimate for a screened CV response.""" cfg = config or NonlinearGradientControlMeanGateConfig() if cfg.target_response_uncertainty_rel <= 0.0: raise ValueError("target_response_uncertainty_rel must be positive") if cfg.min_control_mean_pairs < 1: raise ValueError("min_control_mean_pairs must be positive") summary_raw = variance_report.get("summary") summary = summary_raw if isinstance(summary_raw, Mapping) else {} requested_candidate = candidate_name or str(summary.get("best_control_variate") or "") candidates_raw = variance_report.get("control_variate_candidates") candidates = [row for row in candidates_raw if isinstance(row, Mapping)] if isinstance(candidates_raw, Sequence) else [] candidate = ( next((row for row in candidates if str(row.get("name")) == requested_candidate), None) if requested_candidate else None ) if candidate is None and candidates: candidate = min( candidates, key=lambda row: ( _finite_float(row.get("adjusted_response_uncertainty_rel")) or float("inf"), -(_finite_float(row.get("sem_reduction_fraction")) or float("-inf")), ), ) blockers: list[str] = [] response_mean = _finite_float(summary.get("paired_response_mean")) beta = _finite_float(None if candidate is None else candidate.get("beta")) residual_sem = _finite_float(None if candidate is None else candidate.get("adjusted_response_sem")) residual_uncertainty_rel = _finite_float( None if candidate is None else candidate.get("adjusted_response_uncertainty_rel") ) if candidate is None: blockers.append("no_control_variate_candidate") if response_mean is None or abs(response_mean) <= cfg.value_floor: blockers.append("degenerate_response_mean") if beta is None: blockers.append("missing_control_variate_beta") if residual_sem is None: blockers.append("missing_residual_sem") if cfg.require_state_ensembles_passed: if not bool(plus_ensemble.get("passed", False)): blockers.append("plus_control_ensemble_failed") if not bool(minus_ensemble.get("passed", False)): blockers.append("minus_control_ensemble_failed") source = {"plus": plus_ensemble, "minus": minus_ensemble} plus = _state_means_by_label(source, "plus") minus = _state_means_by_label(source, "minus") common_labels = sorted(set(plus).intersection(minus)) control_samples = [0.5 * (plus[item] + minus[item]) for item in common_labels] response_samples = [plus[item] - minus[item] for item in common_labels] control_mean, control_sem = _mean_and_sem(control_samples) response_mean_independent, response_sem_independent = _mean_and_sem(response_samples) if len(common_labels) < cfg.min_control_mean_pairs: blockers.append("insufficient_control_mean_pairs") if control_sem is None: blockers.append("control_mean_sem_unavailable") control_contribution_sem = None combined_sem = None combined_uncertainty_rel = None if not blockers: assert beta is not None assert residual_sem is not None assert response_mean is not None assert control_sem is not None control_contribution_sem = abs(beta) * control_sem combined_sem = math.sqrt(residual_sem * residual_sem + control_contribution_sem * control_contribution_sem) combined_uncertainty_rel = combined_sem / max(abs(response_mean), cfg.value_floor) if combined_uncertainty_rel > cfg.target_response_uncertainty_rel: blockers.append("combined_response_uncertainty_above_target") pair_rows = [ { "label": item, "plus_mean": _json_number(plus[item]), "minus_mean": _json_number(minus[item]), "control_mean_sample": _json_number(0.5 * (plus[item] + minus[item])), "response_sample": _json_number(plus[item] - minus[item]), } for item in common_labels ] return { "kind": "nonlinear_turbulence_gradient_control_mean_gate", "claim_level": "independent_control_mean_uncertainty_gate_not_gradient_promotion", "case": case, "passed": not blockers, "candidate_name": None if candidate is None else str(candidate.get("name")), "blockers": blockers, "config": asdict(cfg), "source_variance_report_case": variance_report.get("case"), "plus_path": plus_path, "minus_path": minus_path, "summary": { "common_pair_count": len(common_labels), "paired_response_mean": _json_number(response_mean), "residual_sem": _json_number(residual_sem), "residual_uncertainty_rel": _json_number(residual_uncertainty_rel), "control_mean": _json_number(control_mean), "control_mean_sem": _json_number(control_sem), "control_contribution_sem": _json_number(control_contribution_sem), "combined_response_sem": _json_number(combined_sem), "combined_response_uncertainty_rel": _json_number(combined_uncertainty_rel), "independent_response_mean": _json_number(response_mean_independent), "independent_response_sem": _json_number(response_sem_independent), "control_variate_beta": _json_number(beta), }, "pair_rows": pair_rows, }
def _required_replicates( *, current_count: int | None, uncertainty_rel: float, config: NonlinearGradientFollowupConfig, ) -> int | None: if current_count is None or current_count <= 0: return None target = max(float(config.max_gradient_uncertainty_rel), float(config.value_floor)) scale = (float(uncertainty_rel) / target) ** 2 * float(config.sem_safety_factor) return max(current_count + 1, int(math.ceil(current_count * scale))) def _required_replicates_for_scaled_bracket( *, current_count: int | None, uncertainty_rel: float | None, bracket_scale: float, config: NonlinearGradientCandidateDesignConfig, ) -> int | None: if current_count is None or current_count <= 0: return None if uncertainty_rel is None or bracket_scale <= 0.0: return None target = max(float(config.max_gradient_uncertainty_rel), float(config.value_floor)) scale = (float(uncertainty_rel) / (target * float(bracket_scale))) ** 2 scale *= float(config.sem_safety_factor) return max(current_count, int(math.ceil(current_count * scale))) def _design_row( artifact: Mapping[str, Any], *, index: int, path: str | None, label: str | None, config: NonlinearGradientCandidateDesignConfig, ) -> dict[str, Any]: source_ensembles_raw = artifact.get("source_ensembles") source_ensembles = source_ensembles_raw if isinstance(source_ensembles_raw, Mapping) else {} variance_report = _ensemble_state_variance_report(source_ensembles, config=config) response_fraction = _metric(artifact, "response_fraction") asymmetry_rel = _metric(artifact, "fd_asymmetry_rel", "asymmetry_rel") uncertainty_rel = _metric( artifact, "gradient_uncertainty_rel", "gradient_relative_uncertainty", ) current_replicates = _replicate_count(source_ensembles) passed = _artifact_passed(artifact) response_ok = response_fraction is not None and response_fraction >= config.min_fd_response_fraction locality_ok = asymmetry_rel is not None and asymmetry_rel <= config.max_fd_asymmetry_rel uncertainty_ok = uncertainty_rel is not None and uncertainty_rel <= config.max_gradient_uncertainty_rel uncertainty_required_scale = None if uncertainty_rel is not None: uncertainty_required_scale = max(1.0, uncertainty_rel / max(config.max_gradient_uncertainty_rel, config.value_floor)) locality_scale_limit = None if asymmetry_rel is not None and asymmetry_rel > 0.0: locality_scale_limit = ( config.locality_safety_factor * config.max_fd_asymmetry_rel / asymmetry_rel ) elif asymmetry_rel == 0.0: locality_scale_limit = float("inf") usable_bracket_scale = 1.0 if locality_scale_limit is not None: usable_bracket_scale = max(1.0, min(config.max_checked_bracket_scale, locality_scale_limit)) elif uncertainty_required_scale is not None: usable_bracket_scale = min(config.max_checked_bracket_scale, uncertainty_required_scale) bracket_only_feasible = ( bool(response_ok) and bool(locality_ok) and uncertainty_required_scale is not None and locality_scale_limit is not None and uncertainty_required_scale <= min(config.max_checked_bracket_scale, locality_scale_limit) ) required_replicates_no_bracket = _required_replicates_for_scaled_bracket( current_count=current_replicates, uncertainty_rel=uncertainty_rel, bracket_scale=1.0, config=config, ) required_replicates_at_local_limit = _required_replicates_for_scaled_bracket( current_count=current_replicates, uncertainty_rel=uncertainty_rel, bracket_scale=usable_bracket_scale, config=config, ) extra_replicates_at_local_limit = None if required_replicates_at_local_limit is not None and current_replicates is not None: extra_replicates_at_local_limit = max(0, required_replicates_at_local_limit - current_replicates) if passed: action = "freeze_promoted_candidate" recommendation = "candidate already passes production gates; freeze provenance" elif not response_ok: action = "increase_checked_bracket_or_replace_control" recommendation = ( "the finite-difference response is below the resolved-response gate; " "run a checked bracket sweep or replace this control before adding replicas" ) elif not locality_ok: action = "shrink_or_replace_nonlocal_control" recommendation = ( "the finite-difference bracket is nonlocal; shrink the bracket or " "replace the control before spending replicas" ) elif uncertainty_ok: action = "inspect_pass_flag" recommendation = "scalar gates pass but the artifact did not promote; inspect metadata and provenance" elif variance_report["failed_spread_states"]: action = "design_variance_reduction_for_limiting_state" recommendation = str(variance_report["recommendation"]) elif bracket_only_feasible: action = "run_checked_larger_bracket" recommendation = ( "a bounded larger bracket can in principle resolve uncertainty while " "staying below the locality limit; run a short locality/response sweep first" ) elif extra_replicates_at_local_limit is not None and extra_replicates_at_local_limit <= config.max_extra_replicates_per_state: action = "add_limited_replicates_with_locality_cap" recommendation = ( "combine the largest locality-safe bracket with a bounded number of matched replicas" ) else: action = "design_better_conditioned_control_or_variance_reduction" recommendation = ( "bracket-only and bounded-replica fixes are not efficient; design a better-conditioned " "composite direction, variance-reduced observable, or checked response-larger bracket" ) return { "index": index, "label": str(label or artifact.get("parameter_name") or path or index), "path": path, "parameter_name": str(artifact.get("parameter_name") or ""), "passed": passed, "action": action, "recommendation": recommendation, "metrics": { "response_fraction": _json_number(response_fraction), "fd_asymmetry_rel": _json_number(asymmetry_rel), "gradient_uncertainty_rel": _json_number(uncertainty_rel), }, "gate_status": { "response_ok": bool(response_ok), "locality_ok": bool(locality_ok), "uncertainty_ok": bool(uncertainty_ok), }, "variance_reduction": variance_report, "current_replicates_per_state": current_replicates, "uncertainty_required_bracket_scale": _json_number(uncertainty_required_scale), "locality_safe_bracket_scale_limit": _json_number(locality_scale_limit), "usable_bracket_scale_for_estimate": _json_number(usable_bracket_scale), "bracket_only_feasible": bool(bracket_only_feasible), "estimated_required_replicates_no_bracket": required_replicates_no_bracket, "estimated_required_replicates_at_locality_limit": required_replicates_at_local_limit, "estimated_extra_replicates_at_locality_limit": extra_replicates_at_local_limit, }
[docs] def nonlinear_gradient_candidate_design_report( artifacts: Sequence[Mapping[str, Any]], *, paths: Sequence[str | None] | None = None, labels: Sequence[str | None] | None = None, case: str = "nonlinear_turbulence_gradient_candidate_design", config: NonlinearGradientCandidateDesignConfig | None = None, ) -> dict[str, Any]: """Return the next-campaign design implied by failed production gates. The report estimates whether a failed central-FD candidate can be rescued by a larger bracket, by bounded extra replicas, or whether the next campaign should instead change the control/observable. The estimates use the usual ``1/sqrt(N)`` SEM scaling and the local finite-difference assumption that response grows approximately linearly with bracket size before the asymmetry gate is hit. """ cfg = config or NonlinearGradientCandidateDesignConfig() if cfg.max_gradient_uncertainty_rel <= 0.0: raise ValueError("max_gradient_uncertainty_rel must be positive") if cfg.max_fd_asymmetry_rel <= 0.0: raise ValueError("max_fd_asymmetry_rel must be positive") if cfg.max_window_mean_rel_spread <= 0.0: raise ValueError("max_window_mean_rel_spread must be positive") if cfg.max_window_sem_rel <= 0.0: raise ValueError("max_window_sem_rel must be positive") if cfg.min_fd_response_fraction <= 0.0: raise ValueError("min_fd_response_fraction must be positive") if cfg.sem_safety_factor <= 0.0: raise ValueError("sem_safety_factor must be positive") if cfg.max_extra_replicates_per_state < 0: raise ValueError("max_extra_replicates_per_state must be non-negative") if cfg.max_checked_bracket_scale < 1.0: raise ValueError("max_checked_bracket_scale must be at least one") if cfg.locality_safety_factor <= 0.0: raise ValueError("locality_safety_factor must be positive") path_list = list(paths or [None] * len(artifacts)) label_list = list(labels or [None] * len(artifacts)) if len(path_list) != len(artifacts): raise ValueError("paths length must match artifacts") if len(label_list) != len(artifacts): raise ValueError("labels length must match artifacts") candidates = [ _design_row(artifact, index=index, path=path, label=label, config=cfg) for index, (artifact, path, label) in enumerate(zip(artifacts, path_list, label_list)) ] promoted = [row for row in candidates if row["action"] == "freeze_promoted_candidate"] bracket_ready = [row for row in candidates if row["action"] == "run_checked_larger_bracket"] replica_ready = [row for row in candidates if row["action"] == "add_limited_replicates_with_locality_cap"] variance_limited = [row for row in candidates if row["action"] == "design_variance_reduction_for_limiting_state"] replacement = [ row for row in candidates if row["action"] in { "design_better_conditioned_control_or_variance_reduction", "design_variance_reduction_for_limiting_state", "increase_checked_bracket_or_replace_control", "shrink_or_replace_nonlocal_control", } ] if promoted: next_action = "freeze promoted candidate provenance" elif bracket_ready: next_action = "run a bounded bracket/locality sweep before new long windows" elif variance_limited: next_action = "target paired-seed or control-variate variance reduction for limiting states" elif replica_ready: next_action = "combine locality-capped bracket scale with bounded matched replicas" elif replacement: next_action = "design a better-conditioned control or variance-reduced observable before more GPU replicas" else: next_action = "inspect candidate metadata before designing new runs" return { "kind": "nonlinear_turbulence_gradient_candidate_design_report", "claim_level": "campaign_design_not_gradient_evidence", "case": case, "passed": bool(promoted), "next_action": next_action, "config": asdict(cfg), "summary": { "candidate_count": len(candidates), "promoted_candidate_count": len(promoted), "bracket_ready_count": len(bracket_ready), "replica_ready_count": len(replica_ready), "variance_limited_count": len(variance_limited), "replacement_or_variance_reduction_count": len(replacement), }, "candidates": candidates, }
def _composite_control_row( artifact: Mapping[str, Any], *, index: int, path: str | None, label: str | None, config: NonlinearGradientCompositeControlConfig, ) -> dict[str, Any]: parameter_name = str(artifact.get("parameter_name") or label or path or f"candidate_{index}") coefficient = _coefficient_label_from_parameter(artifact.get("parameter_name")) response_fraction = _metric(artifact, "response_fraction") asymmetry_rel = _metric(artifact, "fd_asymmetry_rel", "asymmetry_rel") uncertainty_rel = _metric( artifact, "gradient_uncertainty_rel", "gradient_relative_uncertainty", ) central_gradient = _metric( artifact, "central_gradient", "central_fd_dq_dparameter", "central_fd_dq_dtprim", ) paired_uncertainty_rel = _nested_metric( artifact, "paired_replicate_diagnostics", "central_gradient_uncertainty_rel", ) same_sign_fraction = _nested_metric( artifact, "paired_replicate_diagnostics", "same_sign_fraction", ) response_ok = response_fraction is not None and response_fraction >= config.min_fd_response_fraction locality_ok = asymmetry_rel is not None and asymmetry_rel <= config.max_fd_asymmetry_rel uncertainty_ok = uncertainty_rel is not None and uncertainty_rel <= config.max_gradient_uncertainty_rel same_sign_ok = same_sign_fraction is None or same_sign_fraction >= config.min_same_sign_fraction gradient_ok = central_gradient is not None and abs(central_gradient) > config.value_floor coefficient_ok = coefficient is not None admissible = bool( coefficient_ok and gradient_ok and response_ok and locality_ok and uncertainty_ok and same_sign_ok ) blockers: list[str] = [] if not coefficient_ok: blockers.append("parameter_not_vmec_boundary_coefficient") if not gradient_ok: blockers.append("missing_or_zero_central_gradient") if not response_ok: blockers.append("unresolved_heat_flux_response") if not locality_ok: blockers.append("nonlocal_finite_difference_bracket") if not uncertainty_ok: blockers.append("gradient_uncertainty_too_large") if not same_sign_ok: blockers.append("paired_replicate_sign_not_robust") descent_gradient = None if central_gradient is None else -float(central_gradient) return { "index": index, "label": str(label or parameter_name), "path": path, "parameter_name": parameter_name, "coefficient": coefficient, "admissible_for_composite_direction": admissible, "blockers": blockers, "metrics": { "central_gradient": _json_number(central_gradient), "descent_gradient": _json_number(descent_gradient), "response_fraction": _json_number(response_fraction), "fd_asymmetry_rel": _json_number(asymmetry_rel), "gradient_uncertainty_rel": _json_number(uncertainty_rel), "paired_gradient_uncertainty_rel": _json_number(paired_uncertainty_rel), "same_sign_fraction": _json_number(same_sign_fraction), }, "gate_status": { "coefficient_ok": coefficient_ok, "gradient_ok": gradient_ok, "response_ok": bool(response_ok), "locality_ok": bool(locality_ok), "uncertainty_ok": bool(uncertainty_ok), "same_sign_ok": bool(same_sign_ok), }, }
[docs] def nonlinear_gradient_composite_control_report( artifacts: Sequence[Mapping[str, Any]], *, paths: Sequence[str | None] | None = None, labels: Sequence[str | None] | None = None, case: str = "nonlinear_turbulence_gradient_composite_control_design", config: NonlinearGradientCompositeControlConfig | None = None, ) -> dict[str, Any]: """Design a normalized VMEC-boundary direction from resolved FD candidates. This is a launch-planning gate, not nonlinear-gradient evidence. The returned controls are the steepest-descent direction in the subspace of candidates that already pass locality, response, uncertainty, coefficient, and paired-sign checks. If fewer than ``min_controls`` survive, the report fails closed and provides exact blockers instead of producing a misleading multi-coefficient launch recommendation. """ cfg = config or NonlinearGradientCompositeControlConfig() if cfg.max_gradient_uncertainty_rel <= 0.0: raise ValueError("max_gradient_uncertainty_rel must be positive") if cfg.max_fd_asymmetry_rel <= 0.0: raise ValueError("max_fd_asymmetry_rel must be positive") if cfg.min_fd_response_fraction <= 0.0: raise ValueError("min_fd_response_fraction must be positive") if cfg.min_same_sign_fraction <= 0.0 or cfg.min_same_sign_fraction > 1.0: raise ValueError("min_same_sign_fraction must be in (0, 1]") if cfg.min_controls < 1: raise ValueError("min_controls must be at least one") if cfg.default_relative_delta <= 0.0: raise ValueError("default_relative_delta must be positive") if cfg.max_weight_abs <= 0.0: raise ValueError("max_weight_abs must be positive") path_list = list(paths or [None] * len(artifacts)) label_list = list(labels or [None] * len(artifacts)) if len(path_list) != len(artifacts): raise ValueError("paths length must match artifacts") if len(label_list) != len(artifacts): raise ValueError("labels length must match artifacts") rows = [ _composite_control_row(artifact, index=index, path=path, label=label, config=cfg) for index, (artifact, path, label) in enumerate(zip(artifacts, path_list, label_list)) ] admissible_rows = [row for row in rows if bool(row["admissible_for_composite_direction"])] max_abs_descent = max( (abs(float(row["metrics"]["descent_gradient"])) for row in admissible_rows), default=0.0, ) controls: list[dict[str, Any]] = [] if max_abs_descent > cfg.value_floor: for row in admissible_rows: descent = float(row["metrics"]["descent_gradient"]) weight = cfg.max_weight_abs * descent / max_abs_descent controls.append( { "parameter_name": row["parameter_name"], "coefficient": row["coefficient"], "weight": _json_number(weight), "control_argument": f"{row['coefficient']}:{weight:.12g}", "source_label": row["label"], "source_path": row["path"], } ) launch_ready = len(controls) >= cfg.min_controls if launch_ready: control_args = " ".join(f"--control {control['control_argument']}" for control in controls) command_template = ( "python tools/write_vmec_boundary_profile_perturbation_inputs.py " "--baseline-input <input.vmec> " "--out-dir docs/_static/<case>_composite_direction " f"--case {case} " f"{control_args} " f"--relative-delta {cfg.default_relative_delta:.12g}" ) next_action = "launch a checked VMEC profile-direction bracket sweep before long nonlinear windows" elif controls: command_template = None next_action = ( "only one admissible control remains; screen additional local/resolved controls " "or explicitly run a single-control bracket check before a long campaign" ) else: command_template = None next_action = "no admissible controls; screen new VMEC-boundary directions before nonlinear GPU runs" return { "kind": "nonlinear_turbulence_gradient_composite_control_design", "claim_level": "composite_control_launch_plan_not_gradient_evidence", "case": case, "passed": bool(launch_ready), "next_action": next_action, "config": asdict(cfg), "summary": { "candidate_count": len(rows), "admissible_control_count": len(controls), "required_control_count": cfg.min_controls, "launch_ready": bool(launch_ready), }, "controls": controls, "write_profile_direction_command_template": command_template, "candidates": rows, }
def _ql_seed_rows( artifact: Mapping[str, Any], *, index: int, path: str | None, label: str | None, config: NonlinearGradientQLSeedScreenConfig, ) -> list[dict[str, Any]]: objective_gates = artifact.get("objective_gates") if not isinstance(objective_gates, Sequence): return [] artifact_passed = bool(artifact.get("passed", False)) case_name = str(artifact.get("case_name") or label or path or f"artifact_{index}") parameter_indices = artifact.get("parameter_indices") source_family = _state_control_family(parameter_indices) rows: list[dict[str, Any]] = [] for gate_index, gate in enumerate(objective_gates): if not isinstance(gate, Mapping): continue objective = str(gate.get("objective") or "") if objective not in config.target_objectives: continue parameter = str(gate.get("parameter") or "") implicit = _finite_float(gate.get("implicit")) finite_difference = _finite_float(gate.get("finite_difference")) rel_error = _finite_float(gate.get("rel_error")) gate_passed = bool(gate.get("passed", False)) sensitivity_resolved = implicit is not None and abs(implicit) >= config.min_abs_sensitivity rel_error_ok = rel_error is not None and rel_error <= config.max_objective_rel_error accepted = bool( parameter and sensitivity_resolved and rel_error_ok and gate_passed and (artifact_passed or not config.require_artifact_passed) ) blockers: list[str] = [] if not parameter: blockers.append("missing_parameter_name") if not sensitivity_resolved: blockers.append("unresolved_objective_sensitivity") if not rel_error_ok: blockers.append("ad_fd_relative_error_too_large") if not gate_passed: blockers.append("objective_gate_failed") if config.require_artifact_passed and not artifact_passed: blockers.append("source_artifact_failed") direction = None if implicit is None else -math.copysign(1.0, implicit) rows.append( { "artifact_index": index, "gate_index": gate_index, "label": str(label or case_name), "path": path, "case_name": case_name, "source_kind": str(artifact.get("kind", "")), "source_artifact_passed": artifact_passed, "state_parameter": parameter, "state_control_family": source_family, "parameter_indices": parameter_indices if isinstance(parameter_indices, Mapping) else None, "objective": objective, "accepted_objective_gate": accepted, "blockers": blockers, "metrics": { "implicit_sensitivity": _json_number(implicit), "finite_difference_sensitivity": _json_number(finite_difference), "relative_error": _json_number(rel_error), "descent_direction_sign": _json_number(direction), }, } ) return rows def _sign_consistency(values: Sequence[float], *, value_floor: float) -> tuple[float | None, float | None]: signs = [math.copysign(1.0, value) for value in values if abs(value) > value_floor] if not signs: return None, None positive = sum(1 for sign in signs if sign > 0.0) negative = len(signs) - positive dominant = 1.0 if positive >= negative else -1.0 return dominant, max(positive, negative) / len(signs)
[docs] def nonlinear_gradient_ql_seed_screen_report( artifacts: Sequence[Mapping[str, Any]], *, paths: Sequence[str | None] | None = None, labels: Sequence[str | None] | None = None, case: str = "nonlinear_turbulence_gradient_ql_seed_screen", config: NonlinearGradientQLSeedScreenConfig | None = None, ) -> dict[str, Any]: """Screen QL/linear sensitivity artifacts before nonlinear-gradient runs. The report groups full-chain VMEC/Boozer sensitivity rows by state parameter and admits a control only when the primary objective sensitivity is AD/FD-consistent, resolved, sign-consistent across enough artifacts, and tied to a distinct VMEC-state control. The output is deliberately a planning artifact: VMEC-state controls are not assumed to be patchable ``RBC/ZBS`` input-file coefficients. """ cfg = config or NonlinearGradientQLSeedScreenConfig() if not cfg.target_objectives: raise ValueError("target_objectives must be non-empty") if cfg.primary_objective not in cfg.target_objectives: raise ValueError("primary_objective must be included in target_objectives") if cfg.min_distinct_controls < 1: raise ValueError("min_distinct_controls must be at least one") if cfg.min_cases_per_control < 1: raise ValueError("min_cases_per_control must be at least one") if cfg.min_sign_consistency <= 0.0 or cfg.min_sign_consistency > 1.0: raise ValueError("min_sign_consistency must be in (0, 1]") if cfg.max_objective_rel_error < 0.0: raise ValueError("max_objective_rel_error must be non-negative") if cfg.min_abs_sensitivity <= 0.0: raise ValueError("min_abs_sensitivity must be positive") path_list = list(paths or [None] * len(artifacts)) label_list = list(labels or [None] * len(artifacts)) if len(path_list) != len(artifacts): raise ValueError("paths length must match artifacts") if len(label_list) != len(artifacts): raise ValueError("labels length must match artifacts") objective_rows: list[dict[str, Any]] = [] for index, (artifact, path, label) in enumerate(zip(artifacts, path_list, label_list)): objective_rows.extend(_ql_seed_rows(artifact, index=index, path=path, label=label, config=cfg)) grouped: dict[str, list[dict[str, Any]]] = {} for row in objective_rows: if row["objective"] == cfg.primary_objective: grouped.setdefault(str(row["state_parameter"]), []).append(row) controls: list[dict[str, Any]] = [] for parameter, rows in sorted(grouped.items()): accepted_rows = [row for row in rows if bool(row["accepted_objective_gate"])] sensitivities = [ float(row["metrics"]["implicit_sensitivity"]) for row in accepted_rows if row["metrics"]["implicit_sensitivity"] is not None ] dominant_sign, sign_fraction = _sign_consistency( sensitivities, value_floor=cfg.min_abs_sensitivity, ) n_cases = len({str(row["case_name"]) for row in accepted_rows}) enough_cases = n_cases >= cfg.min_cases_per_control sign_ok = sign_fraction is not None and sign_fraction >= cfg.min_sign_consistency admitted = bool(enough_cases and sign_ok) blockers: list[str] = [] if not accepted_rows: blockers.append("no_accepted_primary_objective_rows") if not enough_cases: blockers.append("insufficient_case_coverage") if not sign_ok: blockers.append("cross_artifact_sign_not_consistent") direction = None if dominant_sign is None else -dominant_sign mean_abs_sensitivity = None if sensitivities: mean_abs_sensitivity = sum(abs(value) for value in sensitivities) / len(sensitivities) controls.append( { "state_parameter": parameter, "state_control_family": accepted_rows[0].get("state_control_family") if accepted_rows else None, "admitted_for_nonlinear_screen": admitted, "blockers": blockers, "primary_objective": cfg.primary_objective, "n_accepted_rows": len(accepted_rows), "n_cases": n_cases, "dominant_sensitivity_sign": _json_number(dominant_sign), "descent_direction_sign": _json_number(direction), "sign_consistency_fraction": _json_number(sign_fraction), "mean_abs_sensitivity": _json_number(mean_abs_sensitivity), "state_control_argument": None if direction is None else f"{parameter}:{direction:.12g}", "source_rows": [ { "case_name": row["case_name"], "path": row["path"], "source_artifact_passed": row["source_artifact_passed"], "implicit_sensitivity": row["metrics"]["implicit_sensitivity"], "relative_error": row["metrics"]["relative_error"], } for row in accepted_rows ], } ) admitted_controls = [row for row in controls if bool(row["admitted_for_nonlinear_screen"])] passed = len(admitted_controls) >= cfg.min_distinct_controls if passed: next_action = "build checked short-bracket nonlinear-gradient screens for admitted VMEC-state controls" elif controls: next_action = ( "generate additional QL/linear sensitivity artifacts for distinct VMEC-state controls " "before nonlinear GPU campaigns" ) else: next_action = "no usable QL/linear sensitivity rows; generate full-chain VMEC/Boozer gradient artifacts first" return { "kind": "nonlinear_turbulence_gradient_ql_seed_screen", "claim_level": "ql_seeded_control_screen_not_nonlinear_gradient_evidence", "case": case, "passed": bool(passed), "next_action": next_action, "config": asdict(cfg), "summary": { "artifact_count": len(artifacts), "objective_row_count": len(objective_rows), "control_count": len(controls), "admitted_control_count": len(admitted_controls), "required_distinct_controls": cfg.min_distinct_controls, }, "admitted_controls": admitted_controls, "controls": controls, "objective_rows": objective_rows, "scope_note": ( "Rows describe vmec_jax state controls. They are not direct VMEC " "input-file RBC/ZBS coefficients unless a separate mapping artifact " "establishes that relation." ), }
def _mapping_control_rows(mapping_artifacts: Sequence[Mapping[str, Any]]) -> dict[str, dict[str, Any]]: rows_by_parameter: dict[str, dict[str, Any]] = {} for artifact_index, artifact in enumerate(mapping_artifacts): artifact_passed = bool(artifact.get("passed", False)) raw_rows = artifact.get("controls") if not isinstance(raw_rows, Sequence): raw_rows = artifact.get("mappings") if not isinstance(raw_rows, Sequence): continue for row_index, raw_row in enumerate(raw_rows): if not isinstance(raw_row, Mapping): continue parameter = raw_row.get("state_parameter") if not isinstance(parameter, str) or not parameter: parameter = raw_row.get("state_control") if not isinstance(parameter, str) or not parameter: continue candidate = { "artifact_index": artifact_index, "row_index": row_index, "state_parameter": parameter, "input_control_argument": raw_row.get("input_control_argument"), "input_direction": raw_row.get("input_direction"), "input_parameter": raw_row.get("input_parameter"), "passed": bool(raw_row.get("passed", False)) and artifact_passed, "row_passed": bool(raw_row.get("passed", False)), "artifact_passed": artifact_passed, "condition_number": _json_number( _metric(raw_row, "condition_number", "mapping_condition_number") ), "relative_residual": _json_number( _metric(raw_row, "relative_residual", "mapping_relative_residual") ), "dominant_response_sign": _json_number( _metric(raw_row, "dominant_response_sign", "response_sign") ), "source_kind": str(artifact.get("kind", "")), "source_case": artifact.get("case") or artifact.get("case_name"), } current = rows_by_parameter.get(parameter) if current is None or ( bool(candidate["passed"]) and not bool(current.get("passed", False)) ): rows_by_parameter[parameter] = candidate return rows_by_parameter
[docs] def nonlinear_gradient_state_control_runbook_report( ql_seed_screen: Mapping[str, Any], *, mapping_artifacts: Sequence[Mapping[str, Any]] = (), case: str = "nonlinear_gradient_state_control_runbook", config: NonlinearGradientStateControlRunbookConfig | None = None, ) -> dict[str, Any]: """Build a fail-closed launch runbook for VMEC-state nonlinear-gradient controls. The QL seed screen operates on internal ``vmec_jax`` state coordinates. Nonlinear campaigns, however, need perturbable input directions that can be written to VMEC inputs and re-equilibrated. This report joins the admitted state controls to an explicit state-to-input mapping artifact and refuses to produce launch commands until the mapping is conditioned and complete. """ cfg = config or NonlinearGradientStateControlRunbookConfig() if cfg.min_mapped_controls < 1: raise ValueError("min_mapped_controls must be at least one") if cfg.max_mapping_condition_number <= 0.0: raise ValueError("max_mapping_condition_number must be positive") if cfg.max_mapping_relative_residual < 0.0: raise ValueError("max_mapping_relative_residual must be non-negative") if cfg.default_relative_delta <= 0.0: raise ValueError("default_relative_delta must be positive") admitted_raw = ql_seed_screen.get("admitted_controls") admitted = admitted_raw if isinstance(admitted_raw, Sequence) else () ql_kind = ql_seed_screen.get("kind") ql_screen_usable = ( ql_kind == "nonlinear_turbulence_gradient_ql_seed_screen" and bool(ql_seed_screen.get("passed", False)) ) mapping_by_parameter = _mapping_control_rows(mapping_artifacts) control_rows: list[dict[str, Any]] = [] mapped_controls: list[dict[str, Any]] = [] for raw_control in admitted: if not isinstance(raw_control, Mapping): continue parameter = raw_control.get("state_parameter") if not isinstance(parameter, str) or not parameter: continue mapping = mapping_by_parameter.get(parameter) blockers: list[str] = [] if ql_kind != "nonlinear_turbulence_gradient_ql_seed_screen": blockers.append("invalid_ql_seed_screen_kind") if not bool(ql_seed_screen.get("passed", False)): blockers.append("ql_seed_screen_failed") if mapping is None: blockers.append("missing_state_to_input_mapping") mapping_passed = False condition_number = None relative_residual = None else: condition_number = _finite_float(mapping.get("condition_number")) relative_residual = _finite_float(mapping.get("relative_residual")) mapping_passed = bool(mapping.get("passed", False)) or not cfg.require_mapping_passed if cfg.require_mapping_passed and not bool(mapping.get("artifact_passed", False)): blockers.append("mapping_artifact_failed") if cfg.require_mapping_passed and not bool(mapping.get("passed", False)): if "mapping_artifact_failed" not in blockers: blockers.append("mapping_artifact_failed") if condition_number is None: blockers.append("missing_mapping_condition_number") elif condition_number > cfg.max_mapping_condition_number: blockers.append("mapping_condition_number_too_large") if relative_residual is None: blockers.append("missing_mapping_relative_residual") elif relative_residual > cfg.max_mapping_relative_residual: blockers.append("mapping_relative_residual_too_large") input_control = None if mapping is None else mapping.get("input_control_argument") if mapping is not None and not input_control: blockers.append("missing_input_control_argument") mapped = bool(ql_screen_usable and mapping is not None and mapping_passed and not blockers and input_control) row = { "state_parameter": parameter, "state_control_argument": raw_control.get("state_control_argument"), "descent_direction_sign": raw_control.get("descent_direction_sign"), "mapping_ready": mapped, "blockers": blockers, "input_control_argument": input_control, "input_direction": None if mapping is None else mapping.get("input_direction"), "input_parameter": None if mapping is None else mapping.get("input_parameter"), "condition_number": _json_number(condition_number), "relative_residual": _json_number(relative_residual), "short_bracket_command_fragment": None if not mapped else ( f"--control {input_control} " f"--relative-delta {cfg.default_relative_delta:.12g}" ), } if mapped: mapped_controls.append(row) control_rows.append(row) passed = len(mapped_controls) >= cfg.min_mapped_controls if passed: next_action = ( "write checked short-bracket nonlinear-gradient launch manifests for mapped VMEC input directions" ) elif control_rows: next_action = ( "build a VMEC-state-to-input mapping artifact before launching nonlinear-gradient campaigns" ) else: next_action = "run the QL seed screen first; no admitted VMEC-state controls were provided" return { "kind": "nonlinear_gradient_state_control_runbook", "claim_level": "state_to_input_mapping_gate_not_nonlinear_gradient_evidence", "case": case, "passed": bool(passed), "next_action": next_action, "config": asdict(cfg), "summary": { "admitted_state_control_count": len(control_rows), "mapped_control_count": len(mapped_controls), "required_mapped_control_count": cfg.min_mapped_controls, "mapping_artifact_count": len(mapping_artifacts), "ql_seed_screen_usable": bool(ql_screen_usable), }, "mapped_controls": mapped_controls, "controls": control_rows, "mapping_protocol": [ "select perturbable VMEC input coefficients or profile directions", "solve baseline/plus/minus equilibria with vmec_jax", "measure the induced VMEC-state response in the admitted state-control basis", "accept the mapping only if the local Jacobian is conditioned and residual-bounded", "only then launch checked short-bracket nonlinear-gradient screens", ], "scope_note": ( "A passed QL seed screen is upstream evidence only. This runbook " "requires an explicit state-to-input mapping before any long-window " "nonlinear turbulence-gradient or optimization claim." ), }
def _planned_matched_runs( *, source_ensembles: Mapping[str, Any], extra_replicates_per_state: int, config: NonlinearGradientFollowupConfig, ) -> list[dict[str, Any]]: if extra_replicates_per_state <= 0: return [] seeds = _seed_numbers(source_ensembles) next_seed = max(seeds) + 1 if seeds else 31 runs: list[dict[str, Any]] = [] for offset in range(extra_replicates_per_state): seed = next_seed + offset for source_state, run_state in STATE_TO_RUN_STATE.items(): if source_state not in source_ensembles: continue runs.append( { "state": run_state, "variant_axis": "seed", "variant_label": f"seed{seed}", "seed": seed, "timestep": float(config.default_nominal_timestep), "reason": ( "independent nominal-timestep replicate to reduce " "central finite-difference gradient uncertainty" ), } ) return runs
[docs] def nonlinear_gradient_followup_plan( artifacts: Sequence[Mapping[str, Any]], *, paths: Sequence[str | None] | None = None, labels: Sequence[str | None] | None = None, case: str = "nonlinear_turbulence_gradient_followup", config: NonlinearGradientFollowupConfig | None = None, ) -> dict[str, Any]: """Build a bounded, fail-closed follow-up plan from gradient artifacts.""" cfg = config or NonlinearGradientFollowupConfig() if cfg.max_extra_replicates_per_state < 0: raise ValueError("max_extra_replicates_per_state must be non-negative") if cfg.sem_safety_factor <= 0.0: raise ValueError("sem_safety_factor must be positive") if cfg.max_gradient_uncertainty_rel <= 0.0: raise ValueError("max_gradient_uncertainty_rel must be positive") path_list = list(paths or [None] * len(artifacts)) label_list = list(labels or [None] * len(artifacts)) if len(path_list) != len(artifacts): raise ValueError("paths length must match artifacts") if len(label_list) != len(artifacts): raise ValueError("labels length must match artifacts") candidate_actions: list[dict[str, Any]] = [] all_runs: list[dict[str, Any]] = [] for index, (artifact, path, label) in enumerate(zip(artifacts, path_list, label_list)): source_ensembles_raw = artifact.get("source_ensembles") source_ensembles = ( source_ensembles_raw if isinstance(source_ensembles_raw, Mapping) else {} ) response_fraction = _metric(artifact, "response_fraction") asymmetry_rel = _metric(artifact, "fd_asymmetry_rel", "asymmetry_rel") uncertainty_rel = _metric( artifact, "gradient_uncertainty_rel", "gradient_relative_uncertainty", ) current_replicates = _replicate_count(source_ensembles) passed = _artifact_passed(artifact) response_ok = ( response_fraction is not None and response_fraction >= float(cfg.min_fd_response_fraction) ) locality_ok = ( asymmetry_rel is not None and asymmetry_rel <= float(cfg.max_fd_asymmetry_rel) ) uncertainty_ok = ( uncertainty_rel is not None and uncertainty_rel <= float(cfg.max_gradient_uncertainty_rel) ) action = "inspect_artifact" recommendation = "missing required metrics; inspect the artifact before spending GPU time" required_replicates = None extra_replicates = 0 planned_runs: list[dict[str, Any]] = [] if passed: action = "freeze_promoted_candidate" recommendation = "candidate already passes production gates; freeze provenance" elif not response_ok: action = "replace_control_or_increase_checked_bracket" recommendation = ( "finite-difference response is not resolved; change the control or " "perform a checked bracket sweep before adding replicas" ) elif not locality_ok: action = "shrink_bracket_or_replace_control" recommendation = ( "finite-difference bracket is nonlocal; shrink the perturbation or " "replace the control before adding replicas" ) elif not uncertainty_ok and uncertainty_rel is not None: required_replicates = _required_replicates( current_count=current_replicates, uncertainty_rel=uncertainty_rel, config=cfg, ) if required_replicates is None: action = "recover_replicate_metadata" recommendation = ( "uncertainty is marginal but the artifact lacks replicate counts; " "recover ensemble metadata before launching runs" ) else: extra_replicates = max(0, required_replicates - int(current_replicates or 0)) extra_replicates = min(extra_replicates, int(cfg.max_extra_replicates_per_state)) action = "add_matched_nominal_seed_replicates" recommendation = ( "response and locality pass, but uncertainty is marginal; add only " "the matched independent replicas needed by the 1/sqrt(N) estimate" ) planned_runs = _planned_matched_runs( source_ensembles=source_ensembles, extra_replicates_per_state=extra_replicates, config=cfg, ) else: action = "no_followup_needed" recommendation = "all scalar gates are satisfied, but artifact was not marked passed" all_runs.extend( { **run, "candidate_index": index, "candidate_label": str(label or artifact.get("parameter_name") or path or index), } for run in planned_runs ) candidate_actions.append( { "index": index, "label": str(label or artifact.get("parameter_name") or path or index), "path": path, "parameter_name": str(artifact.get("parameter_name") or ""), "passed": passed, "action": action, "recommendation": recommendation, "metrics": { "response_fraction": _json_number(response_fraction), "fd_asymmetry_rel": _json_number(asymmetry_rel), "gradient_uncertainty_rel": _json_number(uncertainty_rel), }, "current_replicates_per_state": current_replicates, "estimated_required_replicates_per_state": required_replicates, "extra_replicates_per_state": extra_replicates, "planned_run_count": len(planned_runs), "planned_runs": planned_runs, } ) promoted = [row for row in candidate_actions if row["action"] == "freeze_promoted_candidate"] runnable = [row for row in candidate_actions if row["planned_run_count"]] nonlocal_rows = [row for row in candidate_actions if row["action"] == "shrink_bracket_or_replace_control"] unresolved_rows = [ row for row in candidate_actions if row["action"] == "replace_control_or_increase_checked_bracket" ] if promoted: next_action = "freeze the promoted candidate and do not launch more follow-up runs" elif runnable: next_action = "launch the bounded matched-replicate follow-up for the listed local noisy candidate" elif nonlocal_rows: next_action = "run a smaller-bracket or replacement-control sweep before adding replicas" elif unresolved_rows: next_action = "choose controls with a resolved heat-flux response before adding replicas" else: next_action = "inspect artifacts; no safe production follow-up was inferred" return { "kind": "nonlinear_turbulence_gradient_followup_plan", "claim_level": "campaign_planning_not_gradient_evidence", "case": case, "passed": bool(promoted), "next_action": next_action, "config": asdict(cfg), "summary": { "candidate_count": len(candidate_actions), "promoted_candidate_count": len(promoted), "runnable_followup_candidate_count": len(runnable), "planned_run_count": len(all_runs), }, "candidate_actions": candidate_actions, "planned_runs": all_runs, }