Source code for circle_bundles.analysis.quality

# analysis/quality.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np

from ..nerve.combinatorics import Edge, Tri


[docs] @dataclass class BundleQualityReport: """ Summary of diagnostic and quality metrics for a reconstructed bundle. This object is intended for *inspection and reporting*, not as an input to downstream algorithms. It aggregates several geometric, cohomological, and numerical indicators that help assess how well the data supports a coherent circle- (or O(2)-) bundle structure. Notes ----- - Users typically obtain this object via a bundle method (e.g. ``get_quality()``), rather than constructing it directly. - Individual fields may be None if the corresponding diagnostic was not computed or not applicable to the current bundle model. Fields ------ delta: Global transition inconsistency measure (smaller is better). cocycle_defect: Deviation of the recovered cocycle from being exactly closed. max_edge_rms, mean_edge_rms: Maximum and mean RMS angular transition error over edges. rms_angle_err: Per-edge RMS angular transition error map. Keys are canonicalized edges (i,j) with i<j and values are nonnegative floats. This is useful for weighting edges in visualizations/filtrations when a full TransitionReport is not retained. eps_align_geo, eps_align_euc: Worst-case alignment error (geodesic / Euclidean) for local trivializations. eps_align_geo_mean, eps_align_euc_mean: Mean alignment error (geodesic / Euclidean). alpha: Global scaling or regularization parameter used in alignment (if applicable). n_edges_estimated: Number of edges actually used for estimation. n_edges_requested: Number of edges originally requested. n_triangles: Number of triangles used in consistency checks. witness_err, witness_err_geo: Supremum witness error (chordal / geodesic). witness_err_mean, witness_err_geo_mean: Mean witness error (chordal / geodesic). cocycle_proj_dist: Distance from the cocycle to the nearest projected cocycle representative. """ delta: float cocycle_defect: float max_edge_rms: float mean_edge_rms: float # NEW: keep per-edge RMS for downstream weights/visualizations rms_angle_err: Optional[Dict[Edge, float]] = None eps_align_geo: Optional[float] = None eps_align_euc: Optional[float] = None eps_align_geo_mean: Optional[float] = None eps_align_euc_mean: Optional[float] = None alpha: Optional[float] = None n_edges_estimated: int = 0 n_edges_requested: int = 0 n_triangles: int = 0 witness_err: Optional[float] = None # chordal sup witness_err_geo: Optional[float] = None # geodesic sup (radians) witness_err_mean: Optional[float] = None # chordal mean witness_err_geo_mean: Optional[float] = None # geodesic mean (radians) cocycle_proj_dist: Optional[float] = None
# ---------------------------- # Circle helpers # ---------------------------- _TWO_PI = 2.0 * np.pi def h_dist_S1(angle_array: np.ndarray) -> Tuple[float, float]: """ Returns (h_euc, h_geo) where: h_geo = half the maximum gap on the circle (radians) h_euc = chordal version: 2 sin(h_geo/2) """ a = np.asarray(angle_array, dtype=float).reshape(-1) if a.size == 0: return float("inf"), float("inf") a = np.sort(a % _TWO_PI) gaps = np.diff(a) gaps = np.append(gaps, _TWO_PI - (a[-1] - a[0])) h_geo = float(np.max(gaps) / 2.0) # in [0, pi] h_euc = float(2.0 * np.sin(h_geo / 2.0)) return h_euc, h_geo def _s1_geodesic_from_unit_vectors(u: np.ndarray, v: np.ndarray) -> np.ndarray: """u,v: (...,2) unit vectors. Return geodesic distance (radians) in [0, pi].""" dot = np.sum(u * v, axis=-1) dot = np.clip(dot, -1.0, 1.0) return np.arccos(dot) def _eps_geo_to_euc(eps_geo: float) -> float: """Convert geodesic angle (radians) to chordal distance in R^2.""" return float(2.0 * np.sin(float(eps_geo) / 2.0)) def _canon_edge_weight_map(d: object) -> Optional[Dict[Edge, float]]: """ Best-effort conversion of an edge->weight mapping into canonical form. Accepts mappings whose keys are (i,j) pairs in any order, possibly as lists/tuples. Produces a dict keyed by Edge = (min(i,j), max(i,j)). Returns None if input is None or cannot be interpreted as a mapping. """ if d is None: return None try: items = dict(d).items() except Exception: return None out: Dict[Edge, float] = {} for k, w in items: try: a, b = k e: Edge = (int(a), int(b)) e = (e[0], e[1]) if e[0] <= e[1] else (e[1], e[0]) out[e] = float(w) except Exception: continue return out or None # ---------------------------- # Paper-consistent delta (existing, unchanged) # ---------------------------- def compute_delta_from_triples( U: np.ndarray, f: np.ndarray, triangles: Iterable[Tri], *, min_points: int = 5, use_euclidean: bool = True, fail_fast: bool = True, ) -> float: """ For each triangle (i,j,k): - compute h_dist_S1 on each vertex image set over the triple overlap - take MIN over vertices Then take MAX over triangles. This is the 'best-chart' triple-overlap surjectivity proxy used in your paper. """ U = np.asarray(U, dtype=bool) f = np.asarray(f, dtype=float) triangles = [tuple(sorted(map(int, t))) for t in triangles] if not triangles: return 0.0 vals: List[float] = [] for (i, j, k) in triangles: idx = U[i] & U[j] & U[k] m = int(idx.sum()) if m < min_points: if fail_fast: raise ValueError( f"Triple intersection too small for triangle {(i,j,k)}: " f"{m} points < min_points={min_points}." ) continue per_vertex = [] for v in (i, j, k): h_euc, h_geo = h_dist_S1(f[v, idx]) per_vertex.append(h_euc if use_euclidean else h_geo) vals.append(float(np.min(per_vertex))) return float(np.max(vals)) if vals else 0.0 # ---------------------------- # Cocycle defect (existing) # ---------------------------- def compute_O2_cocycle_defect( cocycle, triangles: Iterable[Tri], *, matrix_norm: str = "fro", ) -> float: triangles = [tuple(sorted(map(int, t))) for t in triangles] if not triangles: return 0.0 coc = cocycle.complete_orientations() def norm(M: np.ndarray) -> float: if matrix_norm == "fro": return float(np.linalg.norm(M, ord="fro")) if matrix_norm == "op": return float(np.linalg.norm(M, ord=2)) raise ValueError("matrix_norm must be 'fro' or 'op'.") worst = 0.0 for (i, j, k) in triangles: Oij = coc.Omega[(i, j)] Ojk = coc.Omega[(j, k)] Oki = coc.Omega[(k, i)] M = Oij @ Ojk @ Oki worst = max(worst, norm(M - np.eye(2))) return worst # ---------------------------- # epsilon alignment sup (geodesic), plus chordal conversion # ---------------------------- def compute_eps_alignment_stats( U: np.ndarray, f: np.ndarray, cocycle, edges: Iterable[Edge], *, min_points: int = 1, ) -> Tuple[Optional[float], Optional[float], Optional[float], Optional[float]]: """ Returns (eps_geo_sup, eps_geo_mean, eps_euc_sup, eps_euc_mean) geo: geodesic angle in radians in [0, pi] euc: chordal distance in R^2, 2 sin(geo/2) in [0, 2] """ U = np.asarray(U, dtype=bool) f = np.asarray(f, dtype=float) coc = cocycle.complete_orientations() edges = list(edges) geo_worst = 0.0 euc_worst = 0.0 got_any = False geo_sum = 0.0 euc_sum = 0.0 n = 0 for (j, k) in edges: if (j, k) not in coc.Omega: continue idx = np.where(U[j] & U[k])[0] if idx.size < min_points: continue O = coc.Omega[(j, k)] # maps k -> j vj = np.stack([np.cos(f[j, idx]), np.sin(f[j, idx])], axis=1) vk = np.stack([np.cos(f[k, idx]), np.sin(f[k, idx])], axis=1) vk_trans = (O @ vk.T).T dgeo = _s1_geodesic_from_unit_vectors(vj, vk_trans) # [0, pi] deuc = 2.0 * np.sin(dgeo / 2.0) # [0, 2] geo_worst = max(geo_worst, float(np.max(dgeo))) euc_worst = max(euc_worst, float(np.max(deuc))) geo_sum += float(np.sum(dgeo)) euc_sum += float(np.sum(deuc)) n += int(dgeo.size) got_any = True if (not got_any) or (n == 0): return None, None, None, None return ( float(geo_worst), float(geo_sum / n), float(euc_worst), float(euc_sum / n), ) def compute_alpha_from_eps_delta_euc( eps_euc: Optional[float], delta: float, ) -> Optional[float]: """ α = ε/(1-δ) where ε is the *Euclidean/chordal* alignment error in [0,2]. α=+inf if δ>=1. If eps_euc is None, returns None. """ if eps_euc is None: return None if float(delta) >= 1.0: return float("inf") return float(eps_euc) / (1.0 - float(delta)) # ---------------------------- # Main entry point # ---------------------------- def compute_bundle_quality( cover, local_triv, cocycle, transitions_report, *, edges: Optional[Iterable[Edge]] = None, triangles: Optional[Iterable[Tri]] = None, # delta settings (paper delta) delta_min_points: int = 5, delta_use_euclidean: bool = True, delta_fail_fast: bool = True, # cocycle defect settings cocycle_matrix_norm: str = "fro", # epsilon settings eps_min_points: int = 1, compute_witness: bool = False, ) -> BundleQualityReport: """ Compute standard diagnostics for a bundle pipeline run. Conventions ---------- - delta is computed either in chordal (Euclidean in R^2) or geodesic (radians), depending on delta_use_euclidean (default True => chordal). - eps alignment is computed in BOTH: * eps_align_euc / eps_align_euc_mean : chordal in [0,2] * eps_align_geo / eps_align_geo_mean : radians in [0,pi] - alpha is computed using the Euclidean/chordal epsilon: alpha = eps_align_euc / (1 - delta) (so if you set delta_use_euclidean=True, numerator/denominator are in the same style) """ if edges is None: edges_list = list(cover.nerve_edges()) else: edges_list = list(edges) if triangles is None: triangles_list = list(cover.nerve_triangles()) else: triangles_list = list(triangles) # paper delta delta = compute_delta_from_triples( cover.U, local_triv.f, triangles_list, min_points=delta_min_points, use_euclidean=delta_use_euclidean, fail_fast=delta_fail_fast, ) # cocycle triangle defect cocycle_defect = compute_O2_cocycle_defect( cocycle, triangles_list, matrix_norm=cocycle_matrix_norm, ) # edge RMS summaries (from transitions_report) max_edge_rms = float(getattr(transitions_report, "max_rms_angle_err", 0.0)) mean_edge_rms = float(getattr(transitions_report, "mean_rms_angle_err", 0.0)) rms_angle_err = _canon_edge_weight_map(getattr(transitions_report, "rms_angle_err", None)) # epsilon alignment stats: (geo sup, geo mean, euc sup, euc mean) eps_geo, eps_geo_mean, eps_euc, eps_euc_mean = compute_eps_alignment_stats( cover.U, local_triv.f, cocycle, edges_list, min_points=eps_min_points, ) # alpha computed using Euclidean epsilon alpha = compute_alpha_from_eps_delta_euc(eps_euc, float(delta)) n_edges_requested = int(getattr(transitions_report, "n_edges_requested", len(edges_list))) n_edges_estimated = int(getattr(transitions_report, "n_edges_estimated", len(getattr(cocycle, "Omega", {})))) # ------------------------------------------ # Optional witness diagnostics (Π(Ω) based) # ------------------------------------------ witness_err = None witness_err_geo = None witness_err_mean = None witness_err_geo_mean = None cocycle_proj_dist = None if compute_witness: # local import to avoid any import-cycle surprises from ..trivializations.bundle_map import ( build_true_frames, witness_error_stats, cocycle_projection_distance, ) Omega = getattr(cocycle, "Omega", None) if Omega is None: raise AttributeError("cocycle.Omega is missing; cannot compute witness diagnostics.") tf = build_true_frames( U=cover.U, pou=cover.pou, Omega=Omega, edges=edges_list, ) w_geo, w_geo_mean, w_c, w_c_mean = witness_error_stats( U=cover.U, f=local_triv.f, Omega_true=tf.Omega_true, edges=edges_list, ) witness_err_geo = w_geo witness_err_geo_mean = w_geo_mean witness_err = w_c witness_err_mean = w_c_mean cocycle_proj_dist = cocycle_projection_distance( U=cover.U, Omega_simplicial=Omega, Omega_true=tf.Omega_true, edges=edges_list, ) return BundleQualityReport( delta=float(delta), cocycle_defect=float(cocycle_defect), max_edge_rms=max_edge_rms, mean_edge_rms=mean_edge_rms, rms_angle_err=rms_angle_err, eps_align_geo=eps_geo, eps_align_euc=eps_euc, eps_align_geo_mean=eps_geo_mean, eps_align_euc_mean=eps_euc_mean, alpha=alpha, n_edges_estimated=n_edges_estimated, n_edges_requested=n_edges_requested, n_triangles=int(len(triangles_list)), witness_err=witness_err, witness_err_geo=witness_err_geo, witness_err_mean=witness_err_mean, witness_err_geo_mean=witness_err_geo_mean, cocycle_proj_dist=cocycle_proj_dist, ) def compute_bundle_quality_from_U( U: np.ndarray, *, pou: Optional[np.ndarray], local_triv, cocycle, transitions_report, edges: Iterable[Edge], triangles: Iterable[Tri], # delta settings (paper delta) delta_min_points: int = 5, delta_use_euclidean: bool = True, delta_fail_fast: bool = True, # cocycle defect settings cocycle_matrix_norm: str = "fro", # epsilon settings eps_min_points: int = 1, compute_witness: bool = False, ) -> BundleQualityReport: """ Cover-free version of compute_bundle_quality. You supply: - U (n_sets, n_samples) boolean matrix - pou (optional, for witness diagnostics) - local_triv.f (angles per patch per sample) - cocycle + transitions_report - explicit edges, triangles (as lists/iterables) """ U = np.asarray(U, dtype=bool) edges_list = list(edges) triangles_list = list(triangles) # paper delta delta = compute_delta_from_triples( U, local_triv.f, triangles_list, min_points=delta_min_points, use_euclidean=delta_use_euclidean, fail_fast=delta_fail_fast, ) # cocycle triangle defect cocycle_defect = compute_O2_cocycle_defect( cocycle, triangles_list, matrix_norm=cocycle_matrix_norm, ) # edge RMS summaries (from transitions_report) max_edge_rms = float(getattr(transitions_report, "max_rms_angle_err", 0.0)) mean_edge_rms = float(getattr(transitions_report, "mean_rms_angle_err", 0.0)) rms_angle_err = _canon_edge_weight_map(getattr(transitions_report, "rms_angle_err", None)) # epsilon alignment stats: (geo sup, geo mean, euc sup, euc mean) eps_geo, eps_geo_mean, eps_euc, eps_euc_mean = compute_eps_alignment_stats( U, local_triv.f, cocycle, edges_list, min_points=eps_min_points, ) # alpha computed using Euclidean epsilon alpha = compute_alpha_from_eps_delta_euc(eps_euc, float(delta)) n_edges_requested = int(getattr(transitions_report, "n_edges_requested", len(edges_list))) n_edges_estimated = int(getattr(transitions_report, "n_edges_estimated", len(getattr(cocycle, "Omega", {})))) # Optional witness diagnostics witness_err = None witness_err_geo = None witness_err_mean = None witness_err_geo_mean = None cocycle_proj_dist = None if compute_witness: from ..trivializations.bundle_map import ( build_true_frames, witness_error_stats, cocycle_projection_distance, ) Omega = getattr(cocycle, "Omega", None) if Omega is None: raise AttributeError("cocycle.Omega is missing; cannot compute witness diagnostics.") tf = build_true_frames( U=U, pou=pou, Omega=Omega, edges=edges_list, ) w_geo, w_geo_mean, w_c, w_c_mean = witness_error_stats( U=U, f=local_triv.f, Omega_true=tf.Omega_true, edges=edges_list, ) witness_err_geo = w_geo witness_err_geo_mean = w_geo_mean witness_err = w_c witness_err_mean = w_c_mean cocycle_proj_dist = cocycle_projection_distance( U=U, Omega_simplicial=Omega, Omega_true=tf.Omega_true, edges=edges_list, ) return BundleQualityReport( delta=float(delta), cocycle_defect=float(cocycle_defect), max_edge_rms=max_edge_rms, mean_edge_rms=mean_edge_rms, rms_angle_err=rms_angle_err, eps_align_geo=eps_geo, eps_align_euc=eps_euc, eps_align_geo_mean=eps_geo_mean, eps_align_euc_mean=eps_euc_mean, alpha=alpha, n_edges_estimated=n_edges_estimated, n_edges_requested=n_edges_requested, n_triangles=int(len(triangles_list)), witness_err=witness_err, witness_err_geo=witness_err_geo, witness_err_mean=witness_err_mean, witness_err_geo_mean=witness_err_geo_mean, cocycle_proj_dist=cocycle_proj_dist, )