Source code for divi.qprog.algorithms._iterative_qaoa

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

"""Iterative QAOA with parameter interpolation across increasing circuit depths.

This module implements the iterative interpolation strategy for QAOA described in
`arXiv:2504.01694 <https://arxiv.org/abs/2504.01694>`_. Instead of optimizing at a
fixed depth with random initialization, the algorithm starts at depth p=1, optimizes,
then interpolates the optimal parameters to warm-start at depth p+1, repeating until
a target depth or convergence criterion is met.

Three interpolation strategies are provided:

- **INTERP**: Linear interpolation (Zhou et al.)
- **FOURIER**: Fourier basis representation
- **CHEBYSHEV**: Chebyshev polynomial basis representation
"""

from collections.abc import Callable
from enum import Enum
from warnings import warn

import numpy as np
import numpy.typing as npt
from qiskit.circuit import ParameterVector

from ._qaoa import QAOA

# ---------------------------------------------------------------------------
# Interpolation strategies
# ---------------------------------------------------------------------------


[docs] class InterpolationStrategy(Enum): """Strategy for interpolating QAOA parameters from depth p to p+1.""" INTERP = "interp" """Linear interpolation (Zhou et al.).""" FOURIER = "fourier" """Fourier basis representation.""" CHEBYSHEV = "chebyshev" """Chebyshev polynomial basis representation."""
def _interp(u: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """Linear interpolation from depth p to p+1 (Zhou et al.). Given a sequence u of length p, produce a sequence of length p+1 via: u'[j] = (j/p) * u[j-1] + (p - j)/p * u[j] with boundary conditions u[-1] = 0 and u[p] = 0. """ p = len(u) result = np.empty(p + 1, dtype=np.float64) for j in range(p + 1): left = u[j - 1] if j > 0 else 0.0 right = u[j] if j < p else 0.0 result[j] = (j / p) * left + (p - j) / p * right return result def _fourier( u: npt.NDArray[np.float64], n_basis_terms: int | None = None ) -> npt.NDArray[np.float64]: """Fourier (DCT-II) basis interpolation from depth p to p+1. Represents the p angles as k cosine coefficients using the DCT-II basis, then evaluates at p+1 grid points: u_j = sum_{l=0}^{k-1} a_l * cos(pi * l * (2j + 1) / (2p)) The DCT-II basis is orthogonal and well-conditioned for all p >= k. Args: u: Parameter sequence of length p. n_basis_terms: Number of basis terms. Defaults to min(p, 5). """ p = len(u) k = min(p, n_basis_terms) if n_basis_terms is not None else min(p, 5) # Build the DCT-II basis matrix at depth p: shape (p, k) j_grid = np.arange(p, dtype=np.float64) l_terms = np.arange(k, dtype=np.float64) basis_p = np.cos(np.outer(np.pi * (2 * j_grid + 1) / (2 * p), l_terms)) # Fit coefficients via least squares coeffs, *_ = np.linalg.lstsq(basis_p, u, rcond=None) # Evaluate at p+1 grid points p_new = p + 1 j_grid_new = np.arange(p_new, dtype=np.float64) basis_new = np.cos(np.outer(np.pi * (2 * j_grid_new + 1) / (2 * p_new), l_terms)) return basis_new @ coeffs def _chebyshev( u: npt.NDArray[np.float64], n_basis_terms: int | None = None ) -> npt.NDArray[np.float64]: """Chebyshev polynomial basis interpolation from depth p to p+1. Represents the p angles via k Chebyshev coefficients at Chebyshev nodes, then evaluates at p+1 nodes: u_j = sum_{l=0}^{k-1} c_l * T_l(x_j) x_j = cos(pi * (j + 0.5) / p) Args: u: Parameter sequence of length p. n_basis_terms: Number of Chebyshev terms. Defaults to min(p, 5). """ p = len(u) k = min(p, n_basis_terms) if n_basis_terms is not None else min(p, 5) # Chebyshev nodes at depth p j_grid = np.arange(p, dtype=np.float64) x_p = np.cos(np.pi * (j_grid + 0.5) / p) # Build Chebyshev basis matrix at depth p: shape (p, k) basis_p = np.empty((p, k), dtype=np.float64) for l in range(k): basis_p[:, l] = np.cos(l * np.arccos(x_p)) # Fit coefficients via least squares coeffs, *_ = np.linalg.lstsq(basis_p, u, rcond=None) # Chebyshev nodes at depth p+1 p_new = p + 1 j_grid_new = np.arange(p_new, dtype=np.float64) x_new = np.cos(np.pi * (j_grid_new + 0.5) / p_new) basis_new = np.empty((p_new, k), dtype=np.float64) for l in range(k): basis_new[:, l] = np.cos(l * np.arccos(x_new)) return (basis_new @ coeffs).astype(np.float64, copy=False) def interpolate_qaoa_params( params: npt.NDArray[np.float64], current_depth: int, strategy: InterpolationStrategy, n_basis_terms: int | None = None, ) -> npt.NDArray[np.float64]: """Interpolate QAOA parameters from depth p to depth p+1. Deinterleaves the flat parameter array into beta and gamma sequences, applies the chosen interpolation strategy independently to each, then reinterleaves into the flat layout expected by QAOA. Args: params: Flat 1D parameter array of length ``2 * current_depth`` with layout ``[beta_0, gamma_0, beta_1, gamma_1, ...]``. current_depth: Current circuit depth p. strategy: Interpolation strategy to use. n_basis_terms: Number of basis terms for FOURIER/CHEBYSHEV strategies. Ignored for INTERP. Defaults to ``min(p, 5)`` when ``None``. Returns: Flat 1D parameter array of length ``2 * (current_depth + 1)``. """ betas = params[0::2] gammas = params[1::2] interp_fn: Callable[..., npt.NDArray[np.float64]] if strategy == InterpolationStrategy.INTERP: new_betas = _interp(betas) new_gammas = _interp(gammas) elif strategy == InterpolationStrategy.FOURIER: new_betas = _fourier(betas, n_basis_terms) new_gammas = _fourier(gammas, n_basis_terms) elif strategy == InterpolationStrategy.CHEBYSHEV: new_betas = _chebyshev(betas, n_basis_terms) new_gammas = _chebyshev(gammas, n_basis_terms) else: raise ValueError(f"Unknown interpolation strategy: {strategy}") new_params = np.empty(2 * (current_depth + 1), dtype=np.float64) new_params[0::2] = new_betas new_params[1::2] = new_gammas return new_params # --------------------------------------------------------------------------- # IterativeQAOA # ---------------------------------------------------------------------------
[docs] class IterativeQAOA(QAOA): """Iterative QAOA with parameter interpolation across increasing depths. Instead of optimizing at a single fixed depth, this class iteratively increases the circuit depth from 1 to ``max_depth``, using the optimal parameters from depth p as a warm-start for depth p+1 via an interpolation strategy. After :meth:`run` completes, the instance represents the depth that achieved the best loss. All standard QAOA properties (``solution``, ``best_params``, ``best_loss``, ``get_top_solutions``) work as usual. Example:: iterative = IterativeQAOA( problem=MaxCutProblem(graph), max_depth=5, strategy=InterpolationStrategy.INTERP, max_iterations_per_depth=20, backend=backend, optimizer=ScipyOptimizer(ScipyMethod.COBYLA), ) iterative.run() print(iterative.best_depth) print(iterative.solution) Args: problem: A :class:`~divi.qprog.problems.QAOAProblem` instance providing the QAOA ingredients. max_depth: Maximum circuit depth to iterate up to. Defaults to 5. strategy: Interpolation strategy for warm-starting. Defaults to INTERP. n_basis_terms: Number of basis terms for FOURIER/CHEBYSHEV strategies. Ignored for INTERP. Defaults to ``min(p, 5)`` when ``None``. max_iterations_per_depth: Maximum optimization iterations per depth. Can be an integer (same for all depths) or a callable ``(depth) -> int`` for adaptive budgets. Defaults to 10. convergence_threshold: If set, stop iterating when the absolute improvement in loss between consecutive depths is below this value. **kwargs: All remaining QAOA keyword arguments (``backend``, ``optimizer``, ``initial_state``, etc.). """ _supports_fixed_param_scans = False def __init__( self, problem, *, max_depth: int = 5, strategy: InterpolationStrategy = InterpolationStrategy.INTERP, n_basis_terms: int | None = None, max_iterations_per_depth: int | Callable[[int], int] = 10, convergence_threshold: float | None = None, **kwargs, ): self._max_depth = max_depth self._strategy = strategy self._n_basis_terms = n_basis_terms self._max_iterations_per_depth = max_iterations_per_depth self._convergence_threshold = convergence_threshold self._depth_history: list[dict] = [] self._best_depth: int = 1 super().__init__( problem, n_layers=1, max_iterations=self._get_max_iters(1), **kwargs, ) @property def _expected_total_iterations(self) -> int: """Total expected iterations across all depths (for progress display).""" return sum(self._get_max_iters(d) for d in range(1, self._max_depth + 1)) def _get_max_iters(self, depth: int) -> int: if callable(self._max_iterations_per_depth): return self._max_iterations_per_depth(depth) return self._max_iterations_per_depth def _rebuild_for_depth(self, depth: int) -> None: """Rebuild parameters and pipelines for a new circuit depth.""" self.n_layers = depth betas = ParameterVector("β", depth) gammas = ParameterVector("γ", depth) # Replaces the parent QAOA._params so the meta-circuit factories # pick up the new layer count. self._params = np.array([[b, g] for b, g in zip(betas, gammas)], dtype=object) self._pipelines = self._build_pipelines() # Previous depth's factories embed wrong-shape DAGs; ``_cost_meta_cache`` # entries stay valid (keyed on ``params.size``). self._meta_circuit_factories = None def _reset_optimization_state(self) -> None: """Reset VQA optimization tracking state for a fresh run.""" self._losses_history = [] self._param_history = [] self._best_params = np.array([], dtype=np.float64) self._best_loss = float("inf") self._best_probs = {} self.current_iteration = 0 self.optimize_result = None self._stop_reason = None self.optimizer.reset()
[docs] def run( self, initial_params=None, perform_final_computation=True, checkpoint_config=None, **kwargs, ): """Run the iterative QAOA procedure across increasing depths. At each depth from 1 to ``max_depth``, the algorithm optimizes the QAOA parameters, then interpolates the best parameters to warm-start the next depth. After all depths are explored, the instance is restored to the depth that achieved the best overall loss. Args: initial_params: Ignored — each depth computes its own warm-start via interpolation of the previous depth's best parameters. Passing a non-None value emits a ``UserWarning``. perform_final_computation: Whether to run the final measurement at the best depth to extract the solution. Defaults to True. checkpoint_config: Forwarded to each inner ``super().run()``. **kwargs: Additional keyword arguments passed to the parent ``run()``. Returns: IterativeQAOA: Returns ``self`` for method chaining. """ if initial_params is not None: warn( "IterativeQAOA ignores `initial_params` — each depth computes its " "own warm-start via interpolation of the previous depth's best " "parameters. Use QAOA directly if you want to seed the first run.", UserWarning, stacklevel=2, ) depth_history: list[dict] = [] prev_best_params: npt.NDArray[np.float64] | None = None for depth in range(1, self._max_depth + 1): self.reporter.info(message=f"Depth {depth}/{self._max_depth}") self._rebuild_for_depth(depth) self._reset_optimization_state() self.max_iterations = self._get_max_iters(depth) depth_initial_params = None if depth > 1 and prev_best_params is not None: interpolated = interpolate_qaoa_params( prev_best_params, depth - 1, self._strategy, self._n_basis_terms, ) depth_initial_params = np.tile( interpolated, (self.optimizer.n_param_sets, 1) ) super().run( initial_params=depth_initial_params, perform_final_computation=False, checkpoint_config=checkpoint_config, **kwargs, ) depth_history.append( { "depth": depth, "best_loss": self._best_loss, "best_params": self._best_params.copy(), "n_iterations": self.current_iteration, } ) prev_best_params = self._best_params.copy() if ( self._convergence_threshold is not None and depth > 1 and abs(depth_history[-2]["best_loss"] - self._best_loss) < self._convergence_threshold ): break # Store history and find best depth self._depth_history = depth_history best_entry = min(depth_history, key=lambda d: d["best_loss"]) self._best_depth = best_entry["depth"] # Restore the instance to the best depth self._rebuild_for_depth(self._best_depth) self._best_params = best_entry["best_params"] self._best_loss = best_entry["best_loss"] if perform_final_computation: self.sample_solution(**kwargs) return self
@property def best_depth(self) -> int: """The circuit depth that achieved the best (lowest) loss.""" return self._best_depth @property def depth_history(self) -> list[dict]: """Per-depth optimization results. Each entry is a dict with keys: ``depth``, ``best_loss``, ``best_params``, ``n_iterations``. """ return self._depth_history.copy()