Source code for divi.qprog.problems._binary

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

"""Binary optimization (QUBO / HUBO) problem class for QAOA."""

from collections.abc import Callable, Hashable
from typing import Any, Literal

import dimod
import hybrid
import numpy as np
import scipy.sparse as sps
from dimod import BinaryQuadraticModel
from qiskit.quantum_info import SparsePauliOp

from divi.hamiltonians import (
    HUBOProblemTypes,
    IsingResult,
    QUBOProblemTypes,
    normalize_binary_polynomial_problem,
    qubo_to_ising,
    x_mixer,
)
from divi.qprog.problems import QAOAProblem


def _merge_substates(_, substates):
    """Merge two hybrid framework substates by stacking their sample sets."""
    a, b = substates
    return a.updated(subsamples=hybrid.hstack_samplesets(a.subsamples, b.subsamples))


def _sanitize_problem_input(qubo):
    """Normalize a QUBO input to (original, BinaryQuadraticModel) pair."""
    if isinstance(qubo, BinaryQuadraticModel):
        return qubo, qubo

    if isinstance(qubo, (np.ndarray, sps.spmatrix)):
        x, y = qubo.shape
        if x != y:
            raise ValueError("Only square matrices are supported.")

    if isinstance(qubo, np.ndarray):
        return qubo, dimod.BinaryQuadraticModel(qubo, vartype=dimod.Vartype.BINARY)

    if isinstance(qubo, sps.spmatrix):
        coo = sps.coo_matrix(qubo)
        return qubo, dimod.BinaryQuadraticModel(
            {(row, col): data for row, col, data in zip(coo.row, coo.col, coo.data)},
            vartype=dimod.Vartype.BINARY,
        )

    raise ValueError(f"Got an unsupported QUBO input format: {type(qubo)}")


[docs] class BinaryOptimizationProblem(QAOAProblem): """Generic QUBO or HUBO problem for QAOA. Wraps a binary optimization problem expressed as either a quadratic form (QUBO) or higher-order polynomial (HUBO), normalizes it to a canonical :class:`~divi.hamiltonians.BinaryPolynomialProblem`, and exposes the QAOA building blocks: cost Hamiltonian (via Ising conversion), standard X-mixer, and ground-state initial superposition. Accepted inputs (anything :class:`dimod.BinaryQuadraticModel` or :class:`dimod.BinaryPolynomial` accepts, plus matrices): - ``np.ndarray`` / ``scipy.sparse.spmatrix`` — square QUBO matrix. - :class:`dimod.BinaryQuadraticModel` — quadratic form with named variables. - ``dict`` with tuple keys — HUBO terms, e.g. ``{(0,): -1.0, (0, 1, 2): 2.0}``. - :class:`dimod.BinaryPolynomial` — polynomial of arbitrary degree. Ising-conversion strategies selected via ``hamiltonian_builder``: - ``"native"`` (default): translates each polynomial term into a Pauli-Z product. Exact, but high-degree terms produce many-body interactions that some simulators handle slowly. - ``"quadratized"``: introduces auxiliary qubits and penalty terms so every interaction becomes two-body. Penalty magnitude is ``quadratization_strength``; ``None`` picks ``2 * max(|hubo coeff|)``. Optionally accepts a ``dimod.hybrid`` decomposer/composer pair to enable partitioned solving via :meth:`decompose`. Without one, the decomposition-related methods raise ``RuntimeError``. Args: problem: QUBO matrix, BQM, HUBO dict, or BinaryPolynomial. hamiltonian_builder: ``"native"`` (default) or ``"quadratized"``. quadratization_strength: Penalty strength for the quadratized builder. ``None`` (default) auto-picks ``2 * max(|hubo coeff|)``. Ignored when ``hamiltonian_builder="native"``. The auto default is sized against a single worst-case term and may under-penalise dense HUBOs where many constraints can be violated simultaneously; pass an explicit value (or raise the multiplier on :class:`~divi.hamiltonians.QuadratizedIsingConverter`) for such instances. decomposer: Optional ``hybrid.traits.ProblemDecomposer`` that enables :meth:`decompose`. composer: Optional ``hybrid.traits.SubsamplesComposer`` for recombining sub-solutions; defaults to ``hybrid.SplatComposer``. Examples: >>> import numpy as np >>> from divi.qprog.problems import BinaryOptimizationProblem >>> Q = np.array([[-1.0, 2.0], [0.0, -1.0]]) >>> problem = BinaryOptimizationProblem(Q) >>> problem.cost_hamiltonian # ready for QAOA """ def __init__( self, problem: QUBOProblemTypes | HUBOProblemTypes, *, hamiltonian_builder: Literal["native", "quadratized"] = "native", quadratization_strength: float | None = None, decomposer: hybrid.traits.ProblemDecomposer | None = None, composer: hybrid.traits.SubsamplesComposer | None = None, ): if hamiltonian_builder not in ("native", "quadratized"): raise ValueError( "hamiltonian_builder must be either 'native' or 'quadratized'." ) self._raw_problem = problem self._canonical_problem = normalize_binary_polynomial_problem(problem) self._hamiltonian_builder: Literal["native", "quadratized"] = ( hamiltonian_builder ) self._quadratization_strength = quadratization_strength self._ising_cache: IsingResult | None = None self._mixer_cache: SparsePauliOp | None = None # Decomposition support (optional) self._decomposer = decomposer self._bqm: dimod.BinaryQuadraticModel | None if decomposer is not None: _, self._bqm = _sanitize_problem_input(problem) self._partitioning = hybrid.Unwind(decomposer) self._aggregating = hybrid.Reduce(hybrid.Lambda(_merge_substates)) | ( composer or hybrid.SplatComposer() ) else: self._bqm = None self._variable_maps = {} self._trivial_program_ids = set() self._bqm_subproblem_states = {} @property def _ising(self) -> IsingResult: """Cached Ising conversion of the canonical polynomial.""" if self._ising_cache is None: self._ising_cache = qubo_to_ising( self._raw_problem, hamiltonian_builder=self._hamiltonian_builder, quadratization_strength=self._quadratization_strength, ) return self._ising_cache @property def cost_hamiltonian(self) -> SparsePauliOp: """Cost Hamiltonian derived from the Ising conversion of the QUBO/HUBO.""" return self._ising.cost_hamiltonian @property def mixer_hamiltonian(self) -> SparsePauliOp: """Standard X-mixer over all qubits in the Ising encoding.""" if self._mixer_cache is None: self._mixer_cache = x_mixer(self._ising.n_qubits) return self._mixer_cache @property def loss_constant(self) -> float: """Constant offset from Ising conversion, added back to expectation values.""" return self._ising.loss_constant @property def decode_fn(self) -> Callable[[str], Any]: """Decode a measurement bitstring to the original variable assignment. For problems built from named variables (e.g. a BQM with non-integer keys), the result is a ``dict`` mapping the original variable names to their bit values. For integer-indexed problems, returns the encoding's raw bitstring projection. """ base_decode = self._ising.encoding.decode_fn vo = self._canonical_problem.variable_order if vo != tuple(range(self._canonical_problem.n_vars)): def _decode_with_names(bitstring: str) -> dict | None: decoded = base_decode(bitstring) if decoded is None: return None return dict(zip(vo, decoded)) return _decode_with_names return base_decode @property def metadata(self) -> dict[str, Any]: """Encoding metadata from the Ising conversion (e.g. quadratization aux info).""" return self._ising.encoding.metadata or {} @property def canonical_problem(self): """The normalized ``BinaryPolynomialProblem``.""" return self._canonical_problem @property def raw_problem(self): """The original QUBO/HUBO input passed at construction time.""" return self._raw_problem
[docs] def decompose(self) -> dict[Hashable, QAOAProblem]: """Partition the problem using the configured ``hybrid`` decomposer. Each non-trivial partition becomes its own :class:`BinaryOptimizationProblem` keyed by ``(name, size)``. Partitions with no interactions are tracked internally and skipped during composition. Raises: ValueError: If no decomposer was provided at construction. """ if self._decomposer is None or self._bqm is None: raise ValueError( "Cannot decompose: no decomposer was provided at construction." ) self._bqm_subproblem_states = {} self._variable_maps = {} self._trivial_program_ids = set() init_state = hybrid.State.from_problem(self._bqm) _bqm_partitions = self._partitioning.run(init_state).result() all_variables = list(self._bqm.variables) var_to_global_idx = {v: i for i, v in enumerate(all_variables)} sub_problems: dict[Hashable, QAOAProblem] = {} for i, partition in enumerate(_bqm_partitions): if i > 0: del partition["problem"] prog_id = (f"P{i}", len(partition.subproblem)) self._bqm_subproblem_states[prog_id] = partition self._variable_maps[prog_id] = [ var_to_global_idx[v] for v in partition.subproblem.variables ] if partition.subproblem.num_interactions == 0: self._trivial_program_ids.add(prog_id) continue ldata, (irow, icol, qdata), _ = partition.subproblem.to_numpy_vectors( partition.subproblem.variables ) coo_mat = sps.coo_matrix( ( np.r_[ldata, qdata], ( np.r_[np.arange(len(ldata)), icol], np.r_[np.arange(len(ldata)), irow], ), ), shape=(len(ldata), len(ldata)), ) sub_problems[prog_id] = BinaryOptimizationProblem(coo_mat) return sub_problems
[docs] def initial_solution_size(self) -> int: """Number of variables in the global solution vector. Equals the number of variables in the underlying BQM. Only defined when a decomposer was provided at construction. Raises: RuntimeError: If no decomposer was provided. """ if self._bqm is None: raise RuntimeError( "initial_solution_size requires a decomposer to have been " "provided at construction." ) return len(self._bqm.variables)
[docs] def extend_solution( self, current_solution: list[int], prog_id: Hashable, candidate_decoded: list[int], ) -> list[int]: """Splice a sub-problem's decoded bits into the global solution. Returns a new list with values at positions corresponding to ``prog_id``'s variables overwritten by ``candidate_decoded``. """ extended = list(current_solution) global_indices = self._variable_maps[prog_id] for local_idx, global_idx in enumerate(global_indices): extended[global_idx] = int(candidate_decoded[local_idx]) return extended
[docs] def evaluate_global_solution(self, solution: list[int]) -> float: """Energy of a global bit assignment under the underlying BQM. Raises: RuntimeError: If no decomposer was provided at construction. """ if self._bqm is None: raise RuntimeError( "evaluate_global_solution requires a decomposer to have been " "provided at construction." ) variables = list(self._bqm.variables) sample = dict(zip(variables, solution)) return float(self._bqm.energy(sample))
[docs] def postprocess_candidates( self, candidates: list[tuple[float, list[int]]], *, strict: bool = False ) -> list[tuple[np.ndarray, float]]: """Run global candidate solutions through the configured composer. Returns: Tuples ``(composed_solution, energy)`` where ``composed_solution`` is an ``int32`` ndarray of bits and ``energy`` is the objective value computed by the composer. """ composed = [self._compose_solution(sol) for _score, sol in candidates] composed.sort(key=lambda entry: entry[1]) return composed
def _compose_solution(self, solution): """Run a single solution through the hybrid composer pipeline.""" states_copy = {} for prog_id, bqm_subproblem_state in self._bqm_subproblem_states.items(): if prog_id in self._trivial_program_ids: var_to_val = {v: 0 for v in bqm_subproblem_state.subproblem.variables} else: variables = list(bqm_subproblem_state.subproblem.variables) global_indices = self._variable_maps[prog_id] var_to_val = { v: solution[gi] for v, gi in zip(variables, global_indices) } sample_set = dimod.SampleSet.from_samples( dimod.as_samples(var_to_val), "BINARY", 0 ) states_copy[prog_id] = bqm_subproblem_state.updated(subsamples=sample_set) states = hybrid.States(*list(states_copy.values())) final_state = self._aggregating.run(states).result() sol, energy, _ = final_state.samples.record[0] return np.array(sol, dtype=np.int32), float(energy)