# SPDX-FileCopyrightText: 2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Callable
from typing import Literal
from warnings import warn
import numpy as np
import numpy.typing as npt
from qiskit.circuit import ParameterVector
from qiskit.converters import circuit_to_dag
from qiskit.quantum_info import SparsePauliOp
from divi.circuits import MetaCircuit
from divi.hamiltonians import (
BinaryPolynomialProblem,
HUBOProblemTypes,
QUBOProblemTypes,
compile_problem,
normalize_binary_polynomial_problem,
)
from divi.hamiltonians._polynomial import _evaluate_binary_polynomial
from divi.pipeline import CircuitPipeline
from divi.pipeline.stages import CircuitSpecStage, ParameterBindingStage, PCECostStage
from divi.qprog.algorithms import VQE
from divi.qprog.algorithms._numba_kernels import _popcount_parity_jit
from divi.qprog.variational_quantum_algorithm import SolutionEntry
def _fast_popcount_parity(arr_input: npt.NDArray[np.integer]) -> npt.NDArray[np.uint8]:
"""
Vectorized calculation of (popcount % 2) for an array of integers.
Uses a Numba JIT kernel with XOR-fold bit manipulation.
"""
return _popcount_parity_jit(arr_input.astype(np.uint64))
def _aggregate_param_group(
param_group: list[tuple[str, dict[str, int]]],
) -> tuple[list[str], npt.NDArray[np.float64], float]:
"""Aggregate a parameter group into states, counts, and total shots."""
shots_dict: dict[str, int] = {}
for _, histogram in param_group:
for bitstring, count in histogram.items():
shots_dict[bitstring] = shots_dict.get(bitstring, 0) + count
state_strings = list(shots_dict.keys())
counts = np.array(list(shots_dict.values()), dtype=float)
total_shots = counts.sum()
return state_strings, counts, float(total_shots)
def _decode_parities(
state_strings: list[str], variable_masks_u64: npt.NDArray[np.uint64]
) -> npt.NDArray[np.uint8]:
"""Decode bitstring parities using the precomputed variable masks."""
states = np.array([int(s, 2) for s in state_strings], dtype=np.uint64)
overlaps = variable_masks_u64[:, None] & states[None, :]
return _fast_popcount_parity(overlaps)
def _setup_dense_encoding(
n_vars: int, n_qubits: int | None
) -> tuple[int, npt.NDArray[np.uint64]]:
"""Compute n_qubits and variable masks for dense (logarithmic) encoding."""
min_qubits = int(np.ceil(np.log2(n_vars + 1)))
if n_qubits is not None and n_qubits < min_qubits:
raise ValueError(
"n_qubits must be >= ceil(log2(N + 1)) to represent all variables. "
f"Got n_qubits={n_qubits}, minimum={min_qubits}."
)
if n_qubits is not None and n_qubits > min_qubits:
warn(
"n_qubits exceeds the minimum required; extra qubits increase circuit "
"size and can add noise without representing more variables.",
UserWarning,
)
n_q = n_qubits if n_qubits is not None else min_qubits
masks = np.arange(1, n_vars + 1, dtype=np.uint64)
return n_q, masks
def _setup_poly_encoding(
n_vars: int, n_qubits: int | None
) -> tuple[int, npt.NDArray[np.uint64]]:
"""Compute n_qubits and variable masks for poly (weight-1 & 2) encoding."""
discriminant = 1 + 8 * n_vars
min_qubits = int(np.ceil((-1 + np.sqrt(discriminant)) / 2))
if n_qubits is not None and n_qubits < min_qubits:
raise ValueError(
f"n_qubits must be >= {min_qubits} for poly encoding with {n_vars} vars. "
f"Got n_qubits={n_qubits}."
)
if n_qubits is not None and n_qubits > min_qubits:
warn(
"n_qubits exceeds the minimum required; extra qubits increase circuit "
"size and can add noise without representing more variables.",
UserWarning,
)
n_q = n_qubits if n_qubits is not None else min_qubits
masks = []
for i in range(n_q):
masks.append(1 << i)
for i in range(n_q):
for j in range(i + 1, n_q):
masks.append((1 << i) | (1 << j))
return n_q, np.array(masks[:n_vars], dtype=np.uint64)
def _masks_to_ham_ops(variable_masks_u64: npt.NDArray[np.uint64], n_qubits: int) -> str:
"""Convert variable masks to semicolon-separated Pauli strings for expval backend.
Each variable's parity is <Z on qubits in mask>. For mask with bit i set, include Z
at wire i. Big Endian: qubit 0 is leftmost.
"""
terms = []
for mask in variable_masks_u64:
m = int(mask)
paulis = ["I"] * n_qubits
for i in range(n_qubits):
if (m >> i) & 1:
paulis[i] = "Z"
terms.append("".join(paulis))
return ";".join(terms)
[docs]
class PCE(VQE):
"""
Generalized Pauli Correlation Encoding (PCE) VQE.
Encodes an N-variable QUBO into qubits by mapping each variable to a parity
(Pauli-Z correlation) of the measured bitstring. Qubit scaling depends on
`encoding_type`: O(log2(N)) for dense, O(sqrt(N)) for poly. The algorithm
uses the measurement distribution to estimate these parities, applies a
smooth relaxation when `alpha` is small, and evaluates the classical QUBO
objective: E = x.T @ Q @ x. For larger `alpha`, it switches to a discrete
objective (CVaR over sampled energies) for harder convergence.
"""
def __init__(
self,
problem: QUBOProblemTypes | HUBOProblemTypes,
n_qubits: int | None = None,
alpha: float = 2.0,
encoding_type: Literal["dense", "poly"] = "dense",
decode_parities_fn: (
Callable[[list[str], npt.NDArray[np.uint64]], npt.NDArray[np.uint8]] | None
) = None,
**kwargs,
):
"""
Args:
problem (QUBOProblemTypes | HUBOProblemTypes): Binary polynomial
objective to minimize. Supports QUBO and HUBO inputs.
n_qubits (int | None): Optional override. Must be >= minimum for the
chosen encoding (ceil(log2(N + 1)) for dense; solve n(n+1)/2 >= N
for poly). Larger values raise a warning for both encodings.
alpha (float): Scaling factor for the tanh() activation. Higher = harder
binary constraints, Lower = smoother gradient.
encoding_type (Literal["dense", "poly"]): "dense" (logarithmic qubits,
default) or "poly" (each variable maps to parity of 1 or 2 qubits).
decode_parities_fn (Callable | None): Optional custom decoder for mapping
encoded bitstrings to parity arrays. Signature:
(state_strings, variable_masks_u64) -> parities. Defaults to the
built-in parity decoder.
**kwargs: Additional arguments passed to VQE (e.g. ansatz, backend).
"""
self.problem: BinaryPolynomialProblem = normalize_binary_polynomial_problem(
problem
)
self.n_vars = self.problem.n_vars
self.alpha = alpha
self.encoding_type = encoding_type
self._use_soft_objective = self.alpha < 5.0
self._final_vector: npt.NDArray[np.integer] | None = None
self._decode_parities_fn = decode_parities_fn or _decode_parities
self._compiled_problem = compile_problem(self.problem)
if kwargs.get("qem_protocol") is not None:
raise ValueError("PCE does not currently support qem_protocol.")
if self.encoding_type == "dense":
self.n_qubits, self._variable_masks_u64 = _setup_dense_encoding(
self.n_vars, n_qubits
)
elif self.encoding_type == "poly":
self.n_qubits, self._variable_masks_u64 = _setup_poly_encoding(
self.n_vars, n_qubits
)
else:
raise ValueError(f"Unknown encoding_type: {self.encoding_type}")
# Placeholder Hamiltonian required by VQE; we care about the measurement
# probability distribution, and Z-basis measurements provide it.
placeholder_hamiltonian = SparsePauliOp.from_sparse_list(
[("Z", [i], 1.0) for i in range(self.n_qubits)],
num_qubits=self.n_qubits,
)
# ``grouping_strategy`` does not affect PCE's cost pipeline
# (PCECostStage replaces MeasurementStage there), but it still
# configures the measurement pipeline used by ``sample_solution``,
# so it flows through unchanged.
super().__init__(hamiltonian=placeholder_hamiltonian, **kwargs)
def _build_pipelines(self) -> dict:
"""Build the PCE-specific cost and measurement pipelines."""
# PCECostStage is a standalone BundleStage (not a MeasurementStage
# subclass) that emits one "measure all qubits" QASM per circuit
# spec and computes the nonlinear binary-polynomial objective from
# raw shot histograms. QEMStage is intentionally excluded.
cost_pipeline = CircuitPipeline(
stages=[
CircuitSpecStage(),
PCECostStage(
problem=self.problem,
alpha=self.alpha,
use_soft_objective=self._use_soft_objective,
decode_parities_fn=self._decode_parities_fn,
variable_masks_u64=self._variable_masks_u64,
),
ParameterBindingStage(),
]
)
return {
"cost": cost_pipeline,
"measurement": self._build_measurement_pipeline(),
}
def _evaluate_cost_param_sets(
self, param_sets: np.ndarray, **kwargs
) -> dict[int, float]:
"""Evaluate the cost pipeline for the provided parameter sets."""
if not self._use_soft_objective and self.backend.supports_expval:
raise ValueError(
"PCE with alpha >= 5.0 (hard CVaR mode) requires shot histograms and "
"cannot use expectation-value backends. Use a sampling backend or set "
"force_sampling=True in JobConfig when using QoroService."
)
return super()._evaluate_cost_param_sets(param_sets, **kwargs)
def _create_meta_circuit_factories(self) -> dict[str, MetaCircuit]:
"""Create meta-circuit factories, handling the edge case of zero parameters."""
n_params = self.ansatz.n_params_per_layer(
self.n_qubits, n_electrons=self.n_electrons
)
weights = np.array(
[ParameterVector(f"w_{i}", n_params) for i in range(self.n_layers)],
dtype=object,
)
ansatz_qc = self.ansatz.build(
weights,
n_qubits=self.n_qubits,
n_layers=self.n_layers,
n_electrons=self.n_electrons,
)
dag = circuit_to_dag(ansatz_qc)
flat_params = tuple(weights.flatten())
return {
"cost_circuit": MetaCircuit(
circuit_bodies=(((), dag),),
parameters=flat_params,
observable=self.cost_hamiltonian,
precision=self._precision,
),
"meas_circuit": MetaCircuit(
circuit_bodies=(((), dag),),
parameters=flat_params,
measured_wires=tuple(range(self.n_qubits)),
precision=self._precision,
),
}
[docs]
def sample_solution(
self,
params: npt.NDArray[np.float64] | None = None,
**kwargs,
) -> "PCE":
"""Compute the final eigenstate and decode it into a PCE vector."""
super().sample_solution(params, **kwargs)
if self._eigenstate is None:
self._final_vector = None
return self
best_bitstring = "".join(str(x) for x in self._eigenstate)
parities = self._decode_parities_fn(
[best_bitstring], self._variable_masks_u64
).flatten()
self._final_vector = 1 - parities
return self
[docs]
def get_top_solutions(
self,
n: int = 10,
*,
min_prob: float = 0.0,
include_decoded: bool = False,
sort_by: Literal["prob", "energy"] = "prob",
) -> list[SolutionEntry]:
"""Get the top-N solutions with decoded QUBO variable assignments.
This method overrides the base implementation to decode encoded qubit states
into actual QUBO variable assignments. The bitstrings in the probability
distribution represent encoded qubit states (O(log2(N)) qubits), not the
decoded QUBO solutions (N variables).
Args:
n (int): Maximum number of solutions to return. Must be non-negative.
If n is 0 or negative, returns an empty list. If n exceeds the
number of available solutions (after filtering), returns all
available solutions. Defaults to 10.
min_prob (float): Minimum probability threshold for including solutions.
Only solutions with probability >= min_prob will be included.
Must be in range [0.0, 1.0]. Defaults to 0.0 (no filtering).
include_decoded (bool): Whether to populate the `decoded` field of
each SolutionEntry with the numpy array representation of the
QUBO solution. If False, the decoded field will be None.
Defaults to False.
sort_by: Sort order for the returned solutions.
``"prob"`` (default): descending by probability.
``"energy"``: ascending by objective energy. When set, the
``energy`` field of each ``SolutionEntry`` is populated.
Returns:
list[SolutionEntry]: List of solution entries sorted according to
``sort_by``, with decoded bitstring for deterministic tie-breaking.
The `bitstring` field contains the decoded QUBO solution as a
binary string (e.g., "01011" for 5 variables), not the encoded
qubit state. Returns an empty list if no probability distribution
is available or n <= 0.
Raises:
RuntimeError: If probability distribution is not available because
optimization has not been run or final computation was not performed.
ValueError: If min_prob is not in range [0.0, 1.0], n is negative,
or sort_by is not one of ``"prob"`` or ``"energy"``.
"""
# Validate inputs
if n < 0:
raise ValueError(f"n must be non-negative, got {n}")
if not (0.0 <= min_prob <= 1.0):
raise ValueError(f"min_prob must be in range [0.0, 1.0], got {min_prob}")
if sort_by not in ("prob", "energy"):
raise ValueError(f"sort_by must be 'prob' or 'energy', got '{sort_by}'")
# Handle edge case: n == 0
if n == 0:
return []
# Require probability distribution to exist
if not self._best_probs:
raise RuntimeError(
"No probability distribution available. The final computation step "
"must be performed to compute the probability distribution. "
"Call run(perform_final_computation=True) to execute optimization "
"and compute the distribution."
)
# Extract the probability distribution (nested by parameter set)
# _best_probs structure: {tag: {bitstring: prob}}
probs_dict = next(iter(self._best_probs.values()))
# Filter by minimum probability
filtered = [(bs, prob) for bs, prob in probs_dict.items() if prob >= min_prob]
# Decode all filtered encoded qubit states to QUBO variable assignments
encoded_bitstrings = [bs for bs, _ in filtered]
decoded_parities = self._decode_parities_fn(
encoded_bitstrings, self._variable_masks_u64
)
# decoded_parities shape: (n_vars, n_states), transpose to (n_states, n_vars)
decoded_qubo_solutions = (1 - decoded_parities).T
# Build full result list with decoded solutions and optional energy
compute_energy = sort_by == "energy"
result = []
for (encoded_bitstring, prob), decoded_solution in zip(
filtered, decoded_qubo_solutions
):
decoded_bitstring = "".join(str(int(x)) for x in decoded_solution)
energy = (
float(
_evaluate_binary_polynomial(
decoded_solution.astype(float),
self.problem,
_compiled=self._compiled_problem,
)
)
if compute_energy
else None
)
result.append(
SolutionEntry(
bitstring=decoded_bitstring,
prob=prob,
decoded=(
decoded_solution.astype(np.int32) if include_decoded else None
),
energy=energy,
)
)
# Sort and take top n
if sort_by == "energy":
result.sort(key=lambda e: (e.energy, e.bitstring))
else:
result.sort(key=lambda e: (-e.prob, e.bitstring))
return result[:n]
@property
def solution(self) -> npt.NDArray[np.integer] | dict:
"""Return the most-probable decoded assignment from the final measurement.
.. note::
This returns the assignment corresponding to the **highest-probability**
encoded bitstring, which may not be the lowest-energy solution.
For energy-ranked solutions, use
:meth:`get_top_solutions(sort_by="energy") <get_top_solutions>` instead.
Returns:
For QUBO problems, a binary 0/1 NumPy array. For HUBO problems
with non-integer variable names, a dictionary mapping variable
names to binary values.
Raises:
RuntimeError: If ``run()`` has not been called yet.
"""
if self._final_vector is None:
raise RuntimeError("Run the VQE optimization first.")
warn(
"PCE.solution returns the decoded assignment of the most-probable "
"encoded bitstring. Because PCE operates in a compressed qubit space "
"(O(log2(N)) qubits for N variables), the most-probable encoded state "
"does not necessarily decode to the lowest-energy QUBO solution. "
"Use get_top_solutions(sort_by='energy') for energy-ranked results.",
stacklevel=2,
)
vo = self.problem.variable_order
if vo != tuple(range(self.problem.n_vars)):
return dict(zip(vo, self._final_vector))
return self._final_vector