Source code for divi.hamiltonians._polynomial

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

"""Binary polynomial types, normalization, and JIT-compiled evaluation kernels.

Handles HUBO/QUBO inputs, normalizes them to :class:`BinaryPolynomialProblem`,
and provides compiled evaluators used by both ``divi.qprog.algorithms._pce``
and the ``PCECostStage`` pipeline stage.
"""

from collections.abc import Hashable
from typing import Any

import dimod
import numba
import numpy as np
import numpy.typing as npt
import scipy.sparse as sps

from ._types import (
    BinaryPolynomialProblem,
    CompiledBinaryPolynomial,
    HUBOProblemTypes,
    HUBOTerm,
    QUBOProblemTypes,
)


[docs] def qubo_to_matrix(qubo: QUBOProblemTypes) -> np.ndarray | sps.spmatrix: """Convert supported QUBO inputs to a square matrix. Args: qubo: QUBO input as list, ndarray, sparse matrix, or BinaryQuadraticModel. Returns: Square QUBO matrix as a dense ndarray or sparse matrix. Raises: ValueError: If the input cannot be converted to a square matrix or the BinaryQuadraticModel is not binary. """ if isinstance(qubo, dimod.BinaryQuadraticModel): if qubo.vartype != dimod.Vartype.BINARY: raise ValueError( f"BinaryQuadraticModel must have vartype='BINARY', got {qubo.vartype}" ) variables = list(qubo.variables) var_to_idx = {v: i for i, v in enumerate(variables)} matrix = np.diag([qubo.linear.get(v, 0) for v in variables]) for (u, v), coeff in qubo.quadratic.items(): i, j = var_to_idx[u], var_to_idx[v] matrix[i, j] = matrix[j, i] = coeff / 2 return matrix if isinstance(qubo, list): qubo = np.asarray(qubo) if isinstance(qubo, np.ndarray): if qubo.ndim != 2 or qubo.shape[0] != qubo.shape[1]: raise ValueError( "Invalid QUBO matrix." f" Got array of shape {qubo.shape}." " Must be a square matrix." ) return qubo if sps.isspmatrix(qubo): if qubo.shape[0] != qubo.shape[1]: raise ValueError( "Invalid QUBO matrix." f" Got sparse matrix of shape {qubo.shape}." " Must be a square matrix." ) return qubo raise ValueError(f"Unsupported QUBO type: {type(qubo)}")
def _default_variable_order(variables: set[Hashable]) -> tuple[Hashable, ...]: """Build deterministic variable order for mixed, potentially incomparable labels. Sorts integers numerically (0, 1, 2, ..., 10, 11) rather than lexicographically ("0", "1", "10", "11", ..., "2"). Non-integer labels fall back to ``repr``-based ordering. """ int_vars = [v for v in variables if isinstance(v, int)] if len(int_vars) == len(variables): return tuple(sorted(int_vars)) return tuple(sorted(variables, key=repr)) def _normalize_hubo_term_key(term: Any) -> HUBOTerm: """Validate and normalize a HUBO term key to a tuple.""" if isinstance(term, tuple): term_tuple = term elif isinstance(term, frozenset): term_tuple = tuple(term) else: raise ValueError( "HUBO term keys must be tuples (or frozensets from BinaryPolynomial internals), " f"got {type(term)}" ) if len(set(term_tuple)) != len(term_tuple): raise ValueError( f"Invalid HUBO term {term_tuple}: duplicate variables in a monomial are not allowed." ) for variable in term_tuple: if not isinstance(variable, Hashable): raise ValueError(f"HUBO variable must be hashable, got {type(variable)}") return term_tuple
[docs] def hubo_to_binary_polynomial(hubo: HUBOProblemTypes) -> dimod.BinaryPolynomial: """Convert HUBO input to a dimod BinaryPolynomial with BINARY vartype.""" if isinstance(hubo, dimod.BinaryPolynomial): if hubo.vartype != dimod.Vartype.BINARY: raise ValueError( f"BinaryPolynomial must have vartype='BINARY', got {hubo.vartype}" ) return hubo if isinstance(hubo, dict): poly_terms: dict[frozenset[Hashable], float] = {} for raw_term, coeff in hubo.items(): term = _normalize_hubo_term_key(raw_term) key = frozenset(term) poly_terms[key] = poly_terms.get(key, 0.0) + float(coeff) return dimod.BinaryPolynomial(poly_terms, dimod.Vartype.BINARY) raise ValueError(f"Unsupported HUBO type: {type(hubo)}")
[docs] def qubo_to_binary_polynomial(qubo: QUBOProblemTypes) -> dimod.BinaryPolynomial: """Convert supported QUBO inputs to a binary polynomial.""" if isinstance(qubo, dimod.BinaryQuadraticModel): if qubo.vartype != dimod.Vartype.BINARY: raise ValueError( f"BinaryQuadraticModel must have vartype='BINARY', got {qubo.vartype}" ) poly_terms: dict[frozenset[Hashable], float] = { frozenset({var}): float(coeff) for var, coeff in qubo.linear.items() if coeff != 0 } for (u, v), coeff in qubo.quadratic.items(): key = frozenset({u, v}) poly_terms[key] = poly_terms.get(key, 0.0) + float(coeff) if qubo.offset != 0: poly_terms[frozenset()] = float(qubo.offset) return dimod.BinaryPolynomial(poly_terms, dimod.Vartype.BINARY) matrix = qubo_to_matrix(qubo) coo = sps.coo_matrix(matrix) rows, cols, values = coo.row, coo.col, coo.data n_vars = matrix.shape[0] poly_terms: dict[frozenset[Hashable], float] = {} for i, j, coeff in zip(rows, cols, values): if coeff == 0: continue key = frozenset({int(i)}) if i == j else frozenset({int(i), int(j)}) poly_terms[key] = poly_terms.get(key, 0.0) + float(coeff) for idx in range(n_vars): poly_terms.setdefault(frozenset({idx}), 0.0) return dimod.BinaryPolynomial(poly_terms, dimod.Vartype.BINARY)
[docs] def normalize_binary_polynomial_problem( problem: QUBOProblemTypes | HUBOProblemTypes, *, variable_order: tuple[Hashable, ...] | list[Hashable] | None = None, ) -> BinaryPolynomialProblem: """Normalize QUBO/HUBO input into canonical binary-polynomial representation.""" if isinstance(problem, QUBOProblemTypes): polynomial = qubo_to_binary_polynomial(problem) else: polynomial = hubo_to_binary_polynomial(problem) variables = set(polynomial.variables) if variable_order is None: resolved_order = _default_variable_order(variables) else: resolved_order = tuple(variable_order) if len(set(resolved_order)) != len(resolved_order): raise ValueError("variable_order must not contain duplicates.") if set(resolved_order) != variables: raise ValueError( "variable_order must contain exactly the variables present in the problem." ) variable_to_idx = {var: idx for idx, var in enumerate(resolved_order)} constant = float(polynomial.get(frozenset(), 0.0)) return BinaryPolynomialProblem( polynomial=polynomial, variable_order=resolved_order, variable_to_idx=variable_to_idx, constant=constant, )
# --------------------------------------------------------------------------- # JIT-compiled polynomial evaluation kernels # # Shared math over ``BinaryPolynomialProblem``. Consumed by both # ``divi.qprog.algorithms._pce`` and the ``PCECostStage`` pipeline stage. # ---------------------------------------------------------------------------
[docs] def compile_problem( problem: BinaryPolynomialProblem, ) -> CompiledBinaryPolynomial: """Pre-extract polynomial terms into Numba-friendly CSR-style arrays. Returns: Tuple of ``(term_indices, term_offsets, coeffs, constant)`` where: - *term_indices* — flat ``int32`` array of variable indices for all terms - *term_offsets* — ``int32`` array of length ``n_terms + 1``; term *t* uses ``term_indices[term_offsets[t]:term_offsets[t+1]]`` - *coeffs* — ``float64`` coefficient per non-constant term - *constant* — sum of constant (empty-key) terms """ constant = 0.0 offsets = [0] all_indices: list[int] = [] all_coeffs: list[float] = [] for term, coeff in problem.terms.items(): c = float(coeff) if c == 0.0: continue if len(term) == 0: constant += c continue indices = [problem.variable_to_idx[var] for var in term] all_indices.extend(indices) offsets.append(len(all_indices)) all_coeffs.append(c) term_indices = ( np.array(all_indices, dtype=np.int32) if all_indices else np.empty(0, dtype=np.int32) ) term_offsets = np.array(offsets, dtype=np.int32) coeffs = ( np.array(all_coeffs, dtype=np.float64) if all_coeffs else np.empty(0, dtype=np.float64) ) return term_indices, term_offsets, coeffs, constant
@numba.njit(cache=True) def _eval_poly_1d_jit( x_vals: npt.NDArray[np.float64], term_indices: npt.NDArray[np.int32], term_offsets: npt.NDArray[np.int32], coeffs: npt.NDArray[np.float64], constant: float, ) -> float: """Evaluate binary polynomial for a single variable assignment (1D).""" n_terms = len(coeffs) energy = constant for t in range(n_terms): start = term_offsets[t] end = term_offsets[t + 1] degree = end - start c = coeffs[t] if degree == 1: energy += c * x_vals[term_indices[start]] ** 2 else: prod = 1.0 for k in range(start, end): prod *= x_vals[term_indices[k]] energy += c * prod return energy @numba.njit(cache=True, parallel=True) def _eval_poly_2d_jit( x_vals: npt.NDArray[np.float64], term_indices: npt.NDArray[np.int32], term_offsets: npt.NDArray[np.int32], coeffs: npt.NDArray[np.float64], constant: float, ) -> npt.NDArray[np.float64]: """Evaluate binary polynomial for batched variable assignments (2D). Uses ``prange`` to parallelise over states. Each thread computes the full polynomial for a subset of states independently — no shared writes. On single-core or constrained environments, ``prange`` degrades to sequential execution with no overhead beyond a thread-pool check. SIMD auto-vectorisation alone is insufficient here because the inner gather pattern (``x_vals[term_indices[k], s]``) defeats LLVM's vectoriser. ``prange`` provides the needed throughput scaling. Args: x_vals: Shape ``(n_vars, n_states)``. Returns: Energy array of shape ``(n_states,)``. """ n_terms = len(coeffs) n_states = x_vals.shape[1] energies = np.empty(n_states, dtype=np.float64) # pyrefly: ignore[not-iterable] # numba.prange is iterable inside @njit for s in numba.prange(n_states): e = constant for t in range(n_terms): start = term_offsets[t] end = term_offsets[t + 1] degree = end - start c = coeffs[t] if degree == 1: e += c * x_vals[term_indices[start], s] ** 2 else: prod = 1.0 for k in range(start, end): prod *= x_vals[term_indices[k], s] e += c * prod energies[s] = e return energies @numba.njit(cache=True) def _compute_hard_cvar_energy_jit( x_vals: npt.NDArray[np.float64], counts: npt.NDArray[np.float64], total_shots: float, alpha_cvar: float, term_indices: npt.NDArray[np.int32], term_offsets: npt.NDArray[np.int32], coeffs: npt.NDArray[np.float64], constant: float, ) -> float: """Fused CVaR energy: evaluate polynomial + sort + accumulate in one kernel. Equivalent to calling ``_eval_poly_2d_jit`` followed by the CVaR partial-sort accumulation, but avoids materialising intermediate NumPy arrays for ``argsort``, ``cumsum``, and ``searchsorted``. Args: x_vals: Shape ``(n_vars, n_states)`` — binary variable assignments. counts: Shape ``(n_states,)`` — shot counts per state. total_shots: Sum of *counts*. alpha_cvar: CVaR tail fraction. term_indices, term_offsets, coeffs, constant: compiled problem arrays. """ len(coeffs) n_states = x_vals.shape[1] # Note: _eval_poly_2d_jit uses ``parallel=True`` / ``prange``, so this # call crosses an implicit threading boundary. Numba's thread pool is # reused (not re-spawned) and degrades to sequential on single-core. energies = _eval_poly_2d_jit(x_vals, term_indices, term_offsets, coeffs, constant) sorted_indices = np.argsort(energies) cutoff_count = int(np.ceil(alpha_cvar * total_shots)) cvar_energy = 0.0 count_sum = 0.0 for i in range(n_states): idx = sorted_indices[i] c = counts[idx] if count_sum + c <= cutoff_count: cvar_energy += energies[idx] * c count_sum += c else: remaining = cutoff_count - count_sum cvar_energy += energies[idx] * remaining count_sum += remaining break return cvar_energy / cutoff_count def _evaluate_binary_polynomial( x_vals: npt.NDArray[np.float64], problem: BinaryPolynomialProblem, _compiled: CompiledBinaryPolynomial | None = None, ) -> npt.NDArray[np.float64] | float: """Evaluate binary polynomial energy for one or many assignments. Degree-1 terms are evaluated as ``c * x_i²`` rather than ``c * x_i`` to undo the linearization (``x_i² → x_i``) applied during polynomial normalization. This is a no-op for binary values (``x² = x``) but produces correct energies for continuous soft-relaxed values. Args: x_vals: Variable assignments. Shape ``(n_vars,)`` for one assignment or ``(n_vars, n_states)`` for many. problem: Canonical binary polynomial problem. _compiled: Pre-compiled CSR arrays from :func:`compile_problem`. When provided the Numba JIT kernel is used instead of the Python loop. """ if _compiled is not None: term_indices, term_offsets, coeffs, constant = _compiled x = np.ascontiguousarray(x_vals, dtype=np.float64) if x.ndim == 1: return float( _eval_poly_1d_jit(x, term_indices, term_offsets, coeffs, constant) ) return _eval_poly_2d_jit(x, term_indices, term_offsets, coeffs, constant) is_single = x_vals.ndim == 1 energy = 0.0 if is_single else np.zeros(x_vals.shape[1], dtype=np.float64) for term, coeff in problem.terms.items(): coeff = float(coeff) if coeff == 0: continue if len(term) == 0: energy = energy + coeff continue indices = [problem.variable_to_idx[var] for var in term] if len(term) == 1: # De-linearise: evaluate as c * x_i² instead of c * x_i. idx = indices[0] monomial = x_vals[idx] ** 2 if is_single else x_vals[idx, :] ** 2 elif is_single: monomial = np.prod(x_vals[indices]) else: monomial = np.prod(x_vals[indices, :], axis=0) energy = energy + (coeff * monomial) if is_single: return float(energy) return np.asarray(energy, dtype=np.float64)