Source code for spectraxgk.stellarator_objective_portfolio

"""Backend-free reduced objective portfolios for stellarator optimization.

This module only reduces already-evaluated objective rows.  It intentionally
does not import VMEC, Boozer, or solver backends so the same contract can be
used by fast CI fixtures and by production VMEC/Boozer objective drivers after
they have built a per-surface/per-alpha/per-ky objective table.
"""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import asdict, dataclass
from typing import Any, Literal, cast

import jax.numpy as jnp
import numpy as np

from spectraxgk.autodiff_validation import autodiff_finite_difference_report, covariance_diagnostics


PortfolioReduction = Literal["weighted_mean", "mean", "max"]

_GROWTH_OBJECTIVE_NAMES = frozenset(("gamma", "growth", "growth_rate"))
_QUASILINEAR_OBJECTIVE_NAMES = frozenset(
    (
        "quasilinear_flux",
        "quasilinear_heat_flux",
        "mixing_length_heat_flux_proxy",
        "linear_heat_flux_weight",
    )
)


[docs] @dataclass(frozen=True) class StellaratorObjectivePortfolioContract: """Static shape/weight contract for a reduced objective portfolio.""" n_surfaces: int n_alphas: int n_ky: int n_objectives: int reduction: PortfolioReduction uses_sample_weights: bool uses_separable_sample_weights: bool uses_objective_weights: bool @property def row_shape(self) -> tuple[int, int, int, int]: """Expected objective-table shape ``(surface, alpha, ky, objective)``.""" return (self.n_surfaces, self.n_alphas, self.n_ky, self.n_objectives) @property def sample_shape(self) -> tuple[int, int, int]: """Expected sample-weight shape ``(surface, alpha, ky)``.""" return (self.n_surfaces, self.n_alphas, self.n_ky) @property def n_samples(self) -> int: """Number of surface/alpha/ky samples in the portfolio.""" return self.n_surfaces * self.n_alphas * self.n_ky
[docs] def to_dict(self) -> dict[str, object]: """Return a JSON-friendly representation.""" payload = asdict(self) payload["row_shape"] = list(self.row_shape) payload["sample_shape"] = list(self.sample_shape) payload["n_samples"] = int(self.n_samples) return payload
[docs] @dataclass(frozen=True) class ReducedPortfolioArtifactGuardConfig: """Requirements for promoting real VMEC/Boozer reduced-portfolio rows.""" min_alphas: int = 2 min_ky: int = 2 min_objectives: int = 1 min_boozer_mode: int = 21 require_growth_objective: bool = True require_quasilinear_objective: bool = True require_vmec_paths: bool = True value_rtol: float = 1.0e-8 value_atol: float = 1.0e-8 def __post_init__(self) -> None: if int(self.min_alphas) < 1: raise ValueError("min_alphas must be >= 1") if int(self.min_ky) < 1: raise ValueError("min_ky must be >= 1") if int(self.min_objectives) < 1: raise ValueError("min_objectives must be >= 1") if int(self.min_boozer_mode) < 1: raise ValueError("min_boozer_mode must be >= 1") if float(self.value_rtol) < 0.0 or float(self.value_atol) < 0.0: raise ValueError("value tolerances must be non-negative")
[docs] def to_dict(self) -> dict[str, object]: """Return a JSON-friendly representation.""" return asdict(self)
def _is_real_numeric_dtype(dtype: jnp.dtype) -> bool: return bool(jnp.issubdtype(dtype, jnp.number) and not jnp.issubdtype(dtype, jnp.complexfloating)) def _floating_dtype(*arrays: jnp.ndarray) -> jnp.dtype: return jnp.result_type(*(arrays + (jnp.asarray(1.0),))) def _objective_rows(objective_rows: Any) -> jnp.ndarray: rows = jnp.asarray(objective_rows) if int(rows.ndim) != 4: raise ValueError("objective_rows must have shape (n_surface, n_alpha, n_ky, n_objective)") if any(int(size) < 1 for size in rows.shape): raise ValueError("objective_rows dimensions must all be positive") if not _is_real_numeric_dtype(rows.dtype): raise TypeError("objective_rows must be a real numeric array") return rows def _parameter_vector(params: Any) -> jnp.ndarray: p = jnp.asarray(params) if int(p.ndim) != 1: raise ValueError("params must be a one-dimensional vector") if int(p.shape[0]) < 1: raise ValueError("params must contain at least one parameter") if not _is_real_numeric_dtype(p.dtype): raise TypeError("params must be a real numeric vector") return jnp.asarray(p, dtype=_floating_dtype(p)) def _concrete_numpy_array(value: jnp.ndarray | Any) -> np.ndarray | None: try: return np.asarray(value, dtype=float) except Exception as exc: # pragma: no cover - exercised by JAX tracers under jit/grad. class_name = type(exc).__name__ if "Tracer" in class_name or "Concretization" in class_name: return None raise def _validate_concrete_weights(weights: jnp.ndarray | Any, *, name: str) -> None: concrete = _concrete_numpy_array(weights) if concrete is None: return if not np.all(np.isfinite(concrete)): raise ValueError(f"{name} must be finite") if np.any(concrete < 0.0): raise ValueError(f"{name} must be non-negative") if float(np.sum(concrete)) <= 0.0: raise ValueError(f"{name} must have positive sum") def _normalized_vector( weights: Any | None, *, size: int, name: str, dtype: jnp.dtype, ) -> jnp.ndarray: if weights is None: return jnp.full((int(size),), 1.0 / float(size), dtype=dtype) array = jnp.asarray(weights, dtype=dtype) if int(array.ndim) != 1 or int(array.shape[0]) != int(size): raise ValueError(f"{name} must be a length-{int(size)} vector") _validate_concrete_weights(array, name=name) return array / jnp.sum(array) def _normalized_sample_weights( objective_rows: jnp.ndarray, *, sample_weights: Any | None, surface_weights: Any | None, alpha_weights: Any | None, ky_weights: Any | None, dtype: jnp.dtype, ) -> jnp.ndarray: n_surface, n_alpha, n_ky, _n_objective = (int(size) for size in objective_rows.shape) has_sample_weights = sample_weights is not None has_axis_weights = any(weight is not None for weight in (surface_weights, alpha_weights, ky_weights)) if has_sample_weights and has_axis_weights: raise ValueError("provide either sample_weights or separable surface/alpha/ky weights, not both") if has_sample_weights: array = jnp.asarray(sample_weights, dtype=dtype) if tuple(int(size) for size in array.shape) != (n_surface, n_alpha, n_ky): raise ValueError("sample_weights must have shape (n_surface, n_alpha, n_ky)") _validate_concrete_weights(array, name="sample_weights") return array / jnp.sum(array) surface = _normalized_vector(surface_weights, size=n_surface, name="surface_weights", dtype=dtype) alpha = _normalized_vector(alpha_weights, size=n_alpha, name="alpha_weights", dtype=dtype) ky = _normalized_vector(ky_weights, size=n_ky, name="ky_weights", dtype=dtype) return surface[:, None, None] * alpha[None, :, None] * ky[None, None, :]
[docs] def portfolio_sample_weight_tensor( objective_rows: Any, *, sample_weights: Any | None = None, surface_weights: Any | None = None, alpha_weights: Any | None = None, ky_weights: Any | None = None, ) -> jnp.ndarray: """Return normalized sample weights with shape ``(surface, alpha, ky)``.""" rows = _objective_rows(objective_rows) dtype = _floating_dtype(rows) return _normalized_sample_weights( rows, sample_weights=sample_weights, surface_weights=surface_weights, alpha_weights=alpha_weights, ky_weights=ky_weights, dtype=dtype, )
[docs] def portfolio_objective_weight_vector( objective_rows: Any, *, objective_weights: Any | None = None, ) -> jnp.ndarray: """Return normalized objective-column weights.""" rows = _objective_rows(objective_rows) dtype = _floating_dtype(rows) return _normalized_vector( objective_weights, size=int(rows.shape[-1]), name="objective_weights", dtype=dtype, )
[docs] def validate_objective_portfolio_contract( objective_rows: Any, *, sample_weights: Any | None = None, surface_weights: Any | None = None, alpha_weights: Any | None = None, ky_weights: Any | None = None, objective_weights: Any | None = None, reduction: PortfolioReduction = "weighted_mean", ) -> StellaratorObjectivePortfolioContract: """Validate static row/weight contracts and return portfolio metadata. Concrete weights must be finite, non-negative, and have positive sum. Under JAX tracing, value-level weight checks are deferred to the caller, but shape contracts remain enforced from static array shapes. """ if reduction not in ("weighted_mean", "mean", "max"): raise ValueError("reduction must be one of 'weighted_mean', 'mean', or 'max'") if reduction == "mean" and any( weight is not None for weight in (sample_weights, surface_weights, alpha_weights, ky_weights, objective_weights) ): raise ValueError("mean reduction does not accept weights; use weighted_mean") if reduction == "max" and any( weight is not None for weight in (sample_weights, surface_weights, alpha_weights, ky_weights) ): raise ValueError("max reduction does not accept sample weights") rows = _objective_rows(objective_rows) _ = portfolio_sample_weight_tensor( rows, sample_weights=sample_weights if reduction == "weighted_mean" else None, surface_weights=surface_weights if reduction == "weighted_mean" else None, alpha_weights=alpha_weights if reduction == "weighted_mean" else None, ky_weights=ky_weights if reduction == "weighted_mean" else None, ) _ = portfolio_objective_weight_vector( rows, objective_weights=objective_weights if reduction in ("weighted_mean", "max") else None, ) n_surface, n_alpha, n_ky, n_objective = (int(size) for size in rows.shape) return StellaratorObjectivePortfolioContract( n_surfaces=n_surface, n_alphas=n_alpha, n_ky=n_ky, n_objectives=n_objective, reduction=reduction, uses_sample_weights=sample_weights is not None and reduction == "weighted_mean", uses_separable_sample_weights=any( weight is not None for weight in (surface_weights, alpha_weights, ky_weights) ) and reduction == "weighted_mean", uses_objective_weights=objective_weights is not None and reduction in ("weighted_mean", "max"), )
def _conditioning_gate( jacobian: np.ndarray, *, min_rank: int | None, condition_number_limit: float, ) -> dict[str, object]: jac = np.asarray(jacobian, dtype=float) if jac.ndim != 2: raise ValueError("jacobian must be a two-dimensional array") limit = float(condition_number_limit) if limit <= 0.0: raise ValueError("condition_number_limit must be positive") expected_rank = int(min_rank) if min_rank is not None else int(min(jac.shape)) if expected_rank < 1: raise ValueError("min_rank must be >= 1") if expected_rank > int(min(jac.shape)): raise ValueError("min_rank cannot exceed min(jacobian.shape)") finite = bool(np.all(np.isfinite(jac))) singular_values = np.linalg.svd(jac, compute_uv=False) if finite else np.asarray([], dtype=float) rank = int(np.linalg.matrix_rank(jac)) if finite else 0 if singular_values.size == 0 or float(singular_values[-1]) <= 0.0: condition_number = float("inf") smallest = 0.0 else: smallest = float(singular_values[-1]) condition_number = float(singular_values[0] / singular_values[-1]) passed = bool(finite and rank >= expected_rank and condition_number <= limit) return { "passed": passed, "finite_jacobian": finite, "sensitivity_map_rank": rank, "min_rank": expected_rank, "rank_deficiency": int(max(expected_rank - rank, 0)), "jacobian_condition_number": condition_number, "condition_number_limit": limit, "smallest_singular_value": smallest, "singular_values": singular_values.tolist(), }
[docs] def objective_portfolio_sensitivity_report( objective_row_fn: Callable[[jnp.ndarray], Any], params: Any, *, sample_weights: Any | None = None, surface_weights: Any | None = None, alpha_weights: Any | None = None, ky_weights: Any | None = None, objective_weights: Any | None = None, reduction: PortfolioReduction = "weighted_mean", step: float = 1.0e-4, rtol: float = 1.0e-4, atol: float = 1.0e-6, min_rank: int | None = None, condition_number_limit: float = 1.0e8, covariance_regularization: float = 1.0e-9, workers: int = 1, parallel_executor: str = "thread", ) -> dict[str, object]: """AD/FD and conditioning report for a reduced objective-row portfolio. ``objective_row_fn`` is the backend boundary: production callers can wire a VMEC/Boozer/quasilinear row builder into this gate while tests can use a cheap fixture. The report checks both the final scalar reduction and the unreduced row sensitivity map so a passing scalar gradient cannot hide a rank-deficient or badly conditioned objective table. """ p = _parameter_vector(params) def row_table(x: jnp.ndarray) -> jnp.ndarray: rows = _objective_rows(objective_row_fn(x)) return rows.astype(_floating_dtype(x, rows)) base_rows = row_table(p) contract = validate_objective_portfolio_contract( base_rows, sample_weights=sample_weights, surface_weights=surface_weights, alpha_weights=alpha_weights, ky_weights=ky_weights, objective_weights=objective_weights, reduction=reduction, ) def scalar_fn(x: jnp.ndarray) -> jnp.ndarray: return aggregate_objective_portfolio( row_table(x), sample_weights=sample_weights, surface_weights=surface_weights, alpha_weights=alpha_weights, ky_weights=ky_weights, objective_weights=objective_weights, reduction=reduction, validate=True, ) def row_vector_fn(x: jnp.ndarray) -> jnp.ndarray: return jnp.ravel(row_table(x)) scalar_gradient_gate = autodiff_finite_difference_report( scalar_fn, p, step=step, rtol=rtol, atol=atol, workers=workers, parallel_executor=parallel_executor, ) row_jacobian_gate = autodiff_finite_difference_report( row_vector_fn, p, step=step, rtol=rtol, atol=atol, workers=workers, parallel_executor=parallel_executor, ) row_jacobian = np.asarray(row_jacobian_gate["jacobian_ad"], dtype=float) row_residual = np.asarray(row_vector_fn(p), dtype=float) conditioning_gate = _conditioning_gate( row_jacobian, min_rank=min_rank, condition_number_limit=condition_number_limit, ) covariance = covariance_diagnostics( row_jacobian, row_residual, regularization=covariance_regularization, ) covariance["source"] = "objective_portfolio_rows" return { "kind": "objective_portfolio_sensitivity_report", "passed": bool( scalar_gradient_gate["passed"] and row_jacobian_gate["passed"] and conditioning_gate["passed"] ), "portfolio_contract": contract.to_dict(), "parameter_count": int(p.size), "base_value": float(scalar_fn(p)), "base_row_norm": float(jnp.linalg.norm(row_vector_fn(p))), "scalar_gradient_gate": scalar_gradient_gate, "row_jacobian_gate": row_jacobian_gate, "conditioning_gate": conditioning_gate, "covariance": covariance, }
def _artifact_text(payload: dict[str, Any], *keys: str) -> str: return " ".join(str(payload.get(key, "")) for key in keys).lower() def _as_finite_float(value: Any, *, name: str) -> float: try: result = float(value) except (TypeError, ValueError) as exc: raise ValueError(f"{name} must be numeric") from exc if not np.isfinite(result): raise ValueError(f"{name} must be finite") return result def _finite_nested(value: Any) -> bool: try: array = np.asarray(value, dtype=float) except (TypeError, ValueError): return False return bool(array.size > 0 and np.all(np.isfinite(array))) def _sample_key(sample: dict[str, Any]) -> tuple[str, float, float]: surface = sample.get("surface_index", "__mid_surface__") surface_key = "__mid_surface__" if surface is None else str(surface) alpha = _as_finite_float(sample.get("alpha"), name="sample alpha") ky_value = sample.get("ky", sample.get("ky_index", sample.get("selected_ky_index"))) ky = _as_finite_float(ky_value, name="sample ky") return surface_key, alpha, ky def _axis_indices(samples: list[dict[str, Any]]) -> tuple[list[str], list[float], list[float]]: surfaces = sorted({_sample_key(sample)[0] for sample in samples}) alphas = sorted({_sample_key(sample)[1] for sample in samples}) kys = sorted({_sample_key(sample)[2] for sample in samples}) return surfaces, alphas, kys def _artifact_sample_values(payload: dict[str, Any]) -> tuple[list[dict[str, Any]], np.ndarray]: samples_raw = payload.get("samples") if not isinstance(samples_raw, list) or not samples_raw: raise ValueError("payload must contain a non-empty samples list") samples = [item for item in samples_raw if isinstance(item, dict)] if len(samples) != len(samples_raw): raise ValueError("all samples must be dictionaries") values = np.asarray(payload.get("base_sample_values"), dtype=float) if values.ndim != 1 or int(values.shape[0]) != len(samples): raise ValueError("base_sample_values must be a length-n_samples vector") if not np.all(np.isfinite(values)): raise ValueError("base_sample_values must be finite") return samples, values def _artifact_sample_value_tensor( payload: dict[str, Any], ) -> tuple[np.ndarray, np.ndarray, list[str], list[float], list[float]]: samples, values = _artifact_sample_values(payload) surfaces, alphas, kys = _axis_indices(samples) shape = (len(surfaces), len(alphas), len(kys)) table = np.full(shape, np.nan, dtype=float) weights = np.full(shape, np.nan, dtype=float) seen: set[tuple[int, int, int]] = set() surface_index = {value: index for index, value in enumerate(surfaces)} alpha_index = {value: index for index, value in enumerate(alphas)} ky_index = {value: index for index, value in enumerate(kys)} for sample, value in zip(samples, values, strict=True): surface, alpha, ky = _sample_key(sample) idx = (surface_index[surface], alpha_index[alpha], ky_index[ky]) if idx in seen: raise ValueError("samples must not contain duplicate surface/alpha/ky rows") seen.add(idx) table[idx] = value weights[idx] = _as_finite_float(sample.get("weight", 1.0), name="sample weight") if len(seen) != int(np.prod(shape)) or not np.all(np.isfinite(table)): raise ValueError("samples must form a complete rectangular surface/alpha/ky table") if not np.all(np.isfinite(weights)) or np.any(weights < 0.0) or float(np.sum(weights)) <= 0.0: raise ValueError("sample weights must be finite, non-negative, and have positive sum") return table[..., None], weights, surfaces, alphas, kys def _artifact_full_objective_table(payload: dict[str, Any], *, n_samples: int) -> np.ndarray: table = np.asarray(payload.get("base_objective_table"), dtype=float) if table.ndim != 2: raise ValueError("base_objective_table must be a two-dimensional array") if int(table.shape[0]) != int(n_samples): raise ValueError("base_objective_table row count must match samples") if not np.all(np.isfinite(table)): raise ValueError("base_objective_table must be finite") return table def _artifact_objective_name_gate( objective_names: list[str], *, config: ReducedPortfolioArtifactGuardConfig, ) -> dict[str, object]: lowered = {name.lower() for name in objective_names} has_growth = bool(lowered & _GROWTH_OBJECTIVE_NAMES) has_quasilinear = bool(lowered & _QUASILINEAR_OBJECTIVE_NAMES) passed = bool( len(objective_names) >= int(config.min_objectives) and (has_growth or not config.require_growth_objective) and (has_quasilinear or not config.require_quasilinear_objective) ) return { "passed": passed, "objective_names": objective_names, "n_objectives": len(objective_names), "min_objectives": int(config.min_objectives), "has_growth_objective": has_growth, "has_quasilinear_objective": has_quasilinear, "requires_growth_objective": bool(config.require_growth_objective), "requires_quasilinear_objective": bool(config.require_quasilinear_objective), } def _artifact_provenance_gate( payload: dict[str, Any], *, config: ReducedPortfolioArtifactGuardConfig, ) -> dict[str, object]: options_raw = payload.get("options") options: dict[str, Any] = options_raw if isinstance(options_raw, dict) else {} mboz = int(payload.get("mboz") or options.get("mboz") or 0) nboz = int(payload.get("nboz") or options.get("nboz") or 0) text = _artifact_text(payload, "kind", "artifact_kind", "source_scope", "claim_scope", "builder") has_vmec_boozer_scope = "vmec_boozer" in text or "vmec/boozer" in text input_path = str(payload.get("input_path", "")) wout_path = str(payload.get("wout_path", "")) has_paths = bool(input_path and wout_path) mode_gate = bool(mboz >= int(config.min_boozer_mode) and nboz >= int(config.min_boozer_mode)) return { "passed": bool( has_vmec_boozer_scope and mode_gate and (has_paths or not config.require_vmec_paths) ), "has_vmec_boozer_scope": has_vmec_boozer_scope, "requires_vmec_paths": bool(config.require_vmec_paths), "has_input_and_wout_paths": has_paths, "input_path": input_path, "wout_path": wout_path, "mboz": mboz, "nboz": nboz, "min_boozer_mode": int(config.min_boozer_mode), } def _artifact_claim_scope_gate(payload: dict[str, Any]) -> dict[str, object]: objective = str(payload.get("objective", payload.get("objective_kind", ""))).lower() text = _artifact_text(payload, "claim_scope", "next_action") has_safe_disclaimer = any( phrase in text for phrase in ( "not a nonlinear", "not a production nonlinear", "not an optimized-equilibrium nonlinear", "nonlinear transport optimization still requires", ) ) no_nonlinear_objective = objective not in { "nonlinear_heat_flux", "nonlinear_window_heat_flux_mean", "production_nonlinear_heat_flux", } no_trace = payload.get("nonlinear_trace") in (None, {}, []) return { "passed": bool(no_nonlinear_objective and no_trace and has_safe_disclaimer), "objective": objective, "nonlinear_trace_absent": no_trace, "has_nonproduction_disclaimer": has_safe_disclaimer, } def _artifact_fd_gate(payload: dict[str, Any]) -> dict[str, object]: finite_fields = { name: _finite_nested(payload.get(name)) for name in ( "minus_sample_values", "base_sample_values", "plus_sample_values", "minus_objective_table", "base_objective_table", "plus_objective_table", ) } scalar_fields = { name: bool(np.isfinite(float(payload.get(name, np.nan)))) for name in ( "base_value", "minus_value", "plus_value", "central_derivative", "response_abs", "curvature_ratio", ) } passed = bool( payload.get("passed", False) and payload.get("finite_values", True) and payload.get("finite_difference_consistent", False) and payload.get("response_resolved", True) and all(finite_fields.values()) and all(scalar_fields.values()) ) return { "passed": passed, "artifact_passed": bool(payload.get("passed", False)), "finite_difference_consistent": bool(payload.get("finite_difference_consistent", False)), "response_resolved": bool(payload.get("response_resolved", True)), "finite_array_fields": finite_fields, "finite_scalar_fields": scalar_fields, } def _gradient_artifact_gate(gradient_artifacts: list[dict[str, Any]]) -> dict[str, object]: objective_names: set[str] = set() n_gates = 0 n_passed = 0 finite = True for artifact in gradient_artifacts: for gate in artifact.get("objective_gates", []) if isinstance(artifact, dict) else []: if not isinstance(gate, dict): finite = False continue n_gates += 1 objective_names.add(str(gate.get("objective", "")).lower()) n_passed += int(bool(gate.get("passed", False))) finite = bool(finite and all(_finite_nested(gate.get(name)) for name in ("implicit", "finite_difference", "abs_error", "rel_error"))) has_growth = bool(objective_names & _GROWTH_OBJECTIVE_NAMES) has_quasilinear = bool(objective_names & _QUASILINEAR_OBJECTIVE_NAMES) passed = bool(n_gates > 0 and n_passed == n_gates and finite and has_growth and has_quasilinear) return { "passed": passed, "n_gradient_artifacts": len(gradient_artifacts), "n_objective_gates": n_gates, "n_passed_objective_gates": n_passed, "finite_ad_fd_values": finite, "objective_names": sorted(objective_names), "has_growth_ad_fd_gate": has_growth, "has_quasilinear_ad_fd_gate": has_quasilinear, }
[docs] def reduced_portfolio_artifact_guard_report( row_artifact: dict[str, Any], *, gradient_artifacts: list[dict[str, Any]] | tuple[dict[str, Any], ...] = (), config: ReducedPortfolioArtifactGuardConfig | None = None, ) -> dict[str, object]: """Validate a real VMEC/Boozer reduced-portfolio artifact before promotion. The guard is backend-free: it consumes already-generated JSON payloads, rebuilds a ``(surface, alpha, ky, objective)`` reducer table from real VMEC/Boozer sample rows, and checks that provenance, coverage, FD/AD diagnostics, and nonlinear-claim boundaries are explicit. """ cfg = config or ReducedPortfolioArtifactGuardConfig() sample_rows, sample_weights, surfaces, alphas, kys = _artifact_sample_value_tensor(row_artifact) samples, _values = _artifact_sample_values(row_artifact) full_table = _artifact_full_objective_table(row_artifact, n_samples=len(samples)) objective_names = [str(item) for item in row_artifact.get("objective_names", [])] objective_name_gate = _artifact_objective_name_gate(objective_names, config=cfg) reduction = str(row_artifact.get("reduction", "weighted_mean")) if reduction not in ("weighted_mean", "mean", "max"): raise ValueError("artifact reduction must be weighted_mean, mean, or max") reducer_reduction = cast(PortfolioReduction, reduction) if reduction == "weighted_mean": reduced_value = float( aggregate_objective_portfolio( sample_rows, sample_weights=sample_weights, reduction=reducer_reduction, ) ) contract = validate_objective_portfolio_contract( sample_rows, sample_weights=sample_weights, reduction=reducer_reduction, ) else: reduced_value = float(aggregate_objective_portfolio(sample_rows, reduction=reducer_reduction)) contract = validate_objective_portfolio_contract(sample_rows, reduction=reducer_reduction) artifact_value = _as_finite_float(row_artifact.get("base_value"), name="base_value") reducer_matches = bool(np.isclose(reduced_value, artifact_value, rtol=cfg.value_rtol, atol=cfg.value_atol)) coverage_gate = { "passed": bool(len(alphas) >= int(cfg.min_alphas) and len(kys) >= int(cfg.min_ky)), "surface_labels": surfaces, "alphas": alphas, "ky_values": kys, "n_surfaces": len(surfaces), "n_alphas": len(alphas), "n_ky": len(kys), "n_samples": len(samples), "min_alphas": int(cfg.min_alphas), "min_ky": int(cfg.min_ky), } full_table_gate = { "passed": bool( full_table.shape[0] == len(samples) and full_table.shape[1] >= int(cfg.min_objectives) and np.all(np.isfinite(full_table)) ), "shape": list(full_table.shape), "finite": bool(np.all(np.isfinite(full_table))), } reducer_gate = { "passed": reducer_matches, "reduction": reduction, "reduced_base_value": reduced_value, "artifact_base_value": artifact_value, "abs_error": float(abs(reduced_value - artifact_value)), "rtol": float(cfg.value_rtol), "atol": float(cfg.value_atol), "contract": contract.to_dict(), } provenance_gate = _artifact_provenance_gate(row_artifact, config=cfg) claim_scope_gate = _artifact_claim_scope_gate(row_artifact) fd_gate = _artifact_fd_gate(row_artifact) ad_fd_gate = _gradient_artifact_gate(list(gradient_artifacts)) passed = bool( provenance_gate["passed"] and coverage_gate["passed"] and full_table_gate["passed"] and objective_name_gate["passed"] and reducer_gate["passed"] and fd_gate["passed"] and ad_fd_gate["passed"] and claim_scope_gate["passed"] ) return { "kind": "vmec_boozer_reduced_portfolio_artifact_guard", "passed": passed, "claim_scope": ( "artifact-level guard for real VMEC/Boozer reduced growth/QL portfolio rows; " "not a production nonlinear turbulent-transport optimization claim" ), "config": cfg.to_dict(), "provenance_gate": provenance_gate, "coverage_gate": coverage_gate, "full_objective_table_gate": full_table_gate, "objective_name_gate": objective_name_gate, "portfolio_reducer_gate": reducer_gate, "finite_difference_gate": fd_gate, "ad_fd_gradient_gate": ad_fd_gate, "claim_scope_gate": claim_scope_gate, "next_action": ( "Use this guard to admit reduced VMEC/Boozer growth/QL portfolio artifacts. " "Production nonlinear claims still require held-out optimized-equilibrium " "nonlinear-window ensembles and transport audits." ), }
[docs] def aggregate_objective_portfolio( objective_rows: Any, *, sample_weights: Any | None = None, surface_weights: Any | None = None, alpha_weights: Any | None = None, ky_weights: Any | None = None, objective_weights: Any | None = None, reduction: PortfolioReduction = "weighted_mean", validate: bool = True, ) -> jnp.ndarray: """Reduce a ``(surface, alpha, ky, objective)`` table to one scalar. ``weighted_mean`` normalizes both sample and objective weights to unit sum, making the scalar invariant to the caller's absolute weight scale. ``mean`` is the unweighted mean over every table entry. ``max`` returns the worst-case objective-weighted sample and is intended for diagnostics rather than smooth gradient-based optimization. """ if validate: validate_objective_portfolio_contract( objective_rows, sample_weights=sample_weights, surface_weights=surface_weights, alpha_weights=alpha_weights, ky_weights=ky_weights, objective_weights=objective_weights, reduction=reduction, ) rows = _objective_rows(objective_rows) dtype = _floating_dtype(rows) values = rows.astype(dtype) if reduction == "mean": return jnp.mean(values) if reduction == "weighted_mean": sample = _normalized_sample_weights( values, sample_weights=sample_weights, surface_weights=surface_weights, alpha_weights=alpha_weights, ky_weights=ky_weights, dtype=dtype, ) objective = _normalized_vector( objective_weights, size=int(values.shape[-1]), name="objective_weights", dtype=dtype, ) return jnp.sum(values * sample[..., None] * objective) if reduction == "max": objective = _normalized_vector( objective_weights, size=int(values.shape[-1]), name="objective_weights", dtype=dtype, ) return jnp.max(jnp.sum(values * objective, axis=-1)) raise ValueError("reduction must be one of 'weighted_mean', 'mean', or 'max'")
__all__ = [ "PortfolioReduction", "ReducedPortfolioArtifactGuardConfig", "StellaratorObjectivePortfolioContract", "aggregate_objective_portfolio", "objective_portfolio_sensitivity_report", "portfolio_objective_weight_vector", "portfolio_sample_weight_tensor", "reduced_portfolio_artifact_guard_report", "validate_objective_portfolio_contract", ]