# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0
import logging
import pickle
from abc import abstractmethod
from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import Any, ClassVar, Literal, NamedTuple, TypeAlias
from warnings import warn
import numpy as np
import numpy.typing as npt
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
from qiskit.quantum_info import SparsePauliOp
from scipy.optimize import OptimizeResult
from divi.backends import CircuitRunner
from divi.circuits import MetaCircuit
from divi.exceptions import ExecutionCancelledError
from divi.pipeline import CircuitPipeline, GroupingStrategy, PipelineEnv, Stage
from divi.pipeline.stages import (
CircuitSpecStage,
MeasurementStage,
ParameterBindingStage,
PauliTwirlStage,
QEMStage,
)
from divi.qprog import ObservableMeasuringMixin
from divi.qprog.checkpointing import (
PROGRAM_STATE_FILE,
CheckpointConfig,
_atomic_write,
_ensure_checkpoint_dir,
_get_checkpoint_subdir_path,
_load_and_validate_pydantic_model,
resolve_checkpoint_path,
)
from divi.qprog.early_stopping import EarlyStopping, StopReason
from divi.qprog.optimizers import (
MonteCarloOptimizer,
Optimizer,
PymooOptimizer,
ScipyMethod,
ScipyOptimizer,
)
from divi.qprog.quantum_program import QuantumProgram
from divi.reporting import TerminalStatus
from divi.viz import ProgramViz
logger = logging.getLogger(__name__)
PARAM_SET_AXIS = "param_set"
def _extract_param_set_idx(key: tuple) -> int:
"""Extract the param_set index from a pipeline result key."""
for axis_name, idx in key:
if axis_name == PARAM_SET_AXIS:
return idx
raise KeyError(f"No '{PARAM_SET_AXIS}' axis found in pipeline result key: {key}")
_RUN_INSTRUCTION = "Call run() to execute the optimization."
ParamHistoryMode: TypeAlias = Literal["all_evaluated", "best_per_iteration"]
[docs]
class SolutionEntry(NamedTuple):
"""A solution entry with bitstring, probability, and optional decoded value.
Args:
bitstring: Binary string representing a computational basis state.
prob: Measured probability in range [0.0, 1.0].
decoded: Optional problem-specific decoded representation. Defaults to None.
energy: Optional objective energy for this solution. Defaults to None.
"""
bitstring: str
prob: float
decoded: Any | None = None
energy: float | None = None
class SubclassState(BaseModel):
"""Container for subclass-specific state."""
data: dict[str, Any] = Field(default_factory=dict)
class OptimizerConfig(BaseModel):
"""Configuration for reconstructing an optimizer."""
type: str
config: dict[str, Any] = Field(default_factory=dict)
class ProgramState(BaseModel):
"""Pydantic model for VariationalQuantumAlgorithm state."""
model_config = ConfigDict(from_attributes=True, populate_by_name=True)
# Metadata
program_type: str = Field(validation_alias="_serialized_program_type")
version: str = "1.0"
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
# Core Algorithm State (mapped to private attributes)
current_iteration: int
max_iterations: int
losses_history: list[dict[str, float]] = Field(validation_alias="_losses_history")
param_history: list[list[list[float]]] = Field(
default_factory=list, validation_alias="_param_history"
)
best_loss: float = Field(validation_alias="_best_loss")
best_probs: dict[str, float] = Field(validation_alias="_best_probs")
total_circuit_count: int = Field(validation_alias="_total_circuit_count")
total_run_time: float = Field(validation_alias="_total_run_time")
seed: int | None = Field(validation_alias="_seed")
stop_reason: str | None = Field(
default=None, validation_alias="_serialized_stop_reason"
)
grouping_strategy: str | None = Field(validation_alias="_grouping_strategy")
# Arrays
best_params: list[float] | None = Field(
default=None, validation_alias="_best_params"
)
final_params: list[float] | None = Field(
default=None, validation_alias="_final_params"
)
# Complex State (mapped to new adapter properties)
rng_state_bytes: bytes | None = Field(
default=None, validation_alias="_serialized_rng_state"
)
optimizer_config: OptimizerConfig = Field(
validation_alias="_serialized_optimizer_config"
)
subclass_state: SubclassState = Field(validation_alias="_serialized_subclass_state")
@field_serializer("rng_state_bytes")
def serialize_bytes(self, v: bytes | None, _info):
return v.hex() if v is not None else None
@field_validator("rng_state_bytes", mode="before")
@classmethod
def validate_bytes(cls, v):
return bytes.fromhex(v) if isinstance(v, str) else v
@field_validator("param_history", mode="before")
@classmethod
def normalize_param_history(cls, v):
"""Accept nested lists or per-iteration ndarray snapshots from disk or program."""
if not v:
return []
rows: list[list[list[float]]] = []
for item in v:
arr = np.asarray(item, dtype=np.float64)
rows.append(arr.tolist())
return rows
@field_serializer("best_params", "final_params")
def serialize_arrays(self, v: npt.NDArray | list | None, _info):
if isinstance(v, np.ndarray):
return v.tolist()
return v
def restore(self, program: "VariationalQuantumAlgorithm") -> None:
"""Apply this state object back to a program instance."""
# 1. Bulk restore standard attributes
for name, field in self.__class__.model_fields.items():
alias = field.validation_alias
target_attr = alias if isinstance(alias, str) else name
# Skip adapter properties (they are read-only / calculated)
if target_attr.startswith("_serialized_"):
continue
val = getattr(self, name)
if target_attr == "_param_history" and val is not None:
val = [np.asarray(block, dtype=np.float64) for block in val]
# Handle numpy conversion
elif "params" in target_attr and val is not None:
val = np.array(val)
if hasattr(program, target_attr):
setattr(program, target_attr, val)
# 2. Restore complex state
if self.rng_state_bytes:
program._rng.bit_generator.state = pickle.loads(self.rng_state_bytes)
program._load_subclass_state(self.subclass_state.data)
def _compute_parameter_shift_mask(n_params: int) -> npt.NDArray[np.float64]:
"""
Generate a binary matrix mask for the parameter shift rule.
This mask is used to determine the shifts to apply to each parameter
when computing gradients via the parameter shift rule in quantum algorithms.
Args:
n_params (int): The number of parameters in the quantum circuit.
Returns:
npt.NDArray[np.float64]: A (2 * n_params, n_params) matrix where each row encodes
the shift to apply to each parameter for a single evaluation.
The values are multiples of 0.5 * pi, with alternating signs.
"""
mask_arr = 1 << np.arange(n_params)
binary_matrix = ((mask_arr[:, np.newaxis] & (1 << np.arange(n_params))) > 0).astype(
np.float64
)
binary_matrix = binary_matrix.repeat(2, axis=0)
binary_matrix[1::2] *= -1
binary_matrix *= 0.5 * np.pi
return binary_matrix
[docs]
class VariationalQuantumAlgorithm(ObservableMeasuringMixin, QuantumProgram):
"""Base class for variational quantum algorithms.
This class provides the foundation for implementing variational quantum
algorithms in Divi. It handles circuit execution, parameter optimization,
and result management for algorithms that optimize parameterized quantum
circuits to minimize cost functions.
Variational algorithms work by:
1. Generating parameterized quantum circuits
2. Executing circuits on quantum hardware/simulators
3. Computing expectation values of cost Hamiltonians
4. Using classical optimizers to update parameters
5. Iterating until convergence
Attributes:
_losses_history: History of loss values during optimization.
_param_history: Raw per-callback parameter batches;
use :meth:`param_history` to read copies with optional filtering.
_final_params: Final optimized parameters.
_best_params: Parameters that achieved the best loss.
_best_loss (float): Best loss achieved during optimization.
_circuits: Generated quantum circuits.
_total_circuit_count (int): Total number of circuits executed.
_total_run_time (float): Total execution time in seconds.
_seed: Random seed for parameter initialization.
_rng: Random number generator.
_grouping_strategy: Strategy for grouping quantum operations.
_qem_protocol: Quantum error mitigation protocol.
_cancellation_event: Event for graceful termination.
_meta_circuit_factories (dict): Lazily-built mapping of circuit names to MetaCircuit factories.
"""
# Subclass-populated declarations.
#
# ``_supports_fixed_param_scans`` defaults to True; override to False for
# VQAs whose parameter space varies during optimization (e.g. depth
# schedules) so ``divi.viz`` fixed-parameter scans reject them. The rest
# have no default — each concrete VQA must assign them during ``__init__``
# (or override as a property) or the corresponding methods will raise
# AttributeError.
_supports_fixed_param_scans: ClassVar[bool] = True
current_iteration: int
n_layers: int
loss_constant: float
cost_hamiltonian: SparsePauliOp
"""The cost Hamiltonian for the variational problem."""
_grouping_strategy: GroupingStrategy
_best_params: npt.NDArray[np.float64]
_final_params: npt.NDArray[np.float64]
_meta_circuit_factories: dict[str, MetaCircuit] | None
def __init__(
self,
backend: CircuitRunner,
optimizer: Optimizer | None = None,
seed: int | None = None,
progress_queue: Queue | None = None,
early_stopping: EarlyStopping | None = None,
**kwargs,
):
"""Initialize the VariationalQuantumAlgorithm.
This constructor is specifically designed for hybrid quantum-classical
variational algorithms. The instance variables `n_layers` and
`n_params_per_layer` must be set by subclasses, where:
- `n_layers` is the number of layers in the quantum circuit.
- `n_params_per_layer` is the number of parameters per layer.
For exotic variational algorithms where these variables may not be applicable,
the `_initialize_param_sets` method should be overridden to generate the
starting parameters for a fresh optimization run.
Args:
backend (CircuitRunner): Quantum circuit execution backend.
optimizer (Optimizer | None): The optimizer to use for parameter optimization.
Defaults to MonteCarloOptimizer().
seed (int | None): Random seed for parameter initialization. Defaults to None.
progress_queue (Queue | None): Queue for progress reporting. Defaults to None.
early_stopping (EarlyStopping | None): Early stopping controller. When
provided, the optimization loop will be halted if any of the
configured criteria are met (e.g. patience exceeded, gradient
below threshold, cost variance settled). Defaults to None.
Keyword Args:
grouping_strategy (str): Strategy for partitioning Hamiltonian terms
into compatible measurement groups; one circuit is executed per
group. Options: ``"qwc"`` (qubit-wise-commuting — most
compact), ``"wires"`` (group by support wires), or ``None``
(one circuit per term). Defaults to ``"qwc"``.
shot_distribution (str or callable, optional): Focus the backend's
shot budget on the Hamiltonian terms that matter most.
Without this option, every measurement group is sampled with
the backend's full shot count, even tiny terms with little
impact on the final energy. With ``shot_distribution`` set,
the same total budget is split across groups according to
their importance — reducing variance without spending more
shots.
Available strategies:
- ``"uniform"`` — equal split across groups.
- ``"weighted"`` — proportional to per-group coefficient L1
norm; dominant Hamiltonian terms get more shots.
- ``"weighted_random"`` — multinomial sample of the same
probabilities; may drop more low-weight groups than the
deterministic ``"weighted"`` for the same budget.
- A callable ``(group_l1_norms, total_shots) -> per_group_shots``
for fully custom allocation.
Example::
vqe = MyVQA(
backend=QiskitSimulator(shots=1000),
shot_distribution="weighted",
)
vqe.run()
Only valid when sampling is actually used. Setting it on a
backend that computes expectation values analytically
(``grouping_strategy="_backend_expval"``) is rejected because
shots are ignored in that mode. Defaults to ``None`` (every
group receives the full shot budget).
precision (int): Forwarded to
:class:`~divi.qprog.QuantumProgram` — decimal places for
numeric parameter values in QASM conversion. Higher values
produce longer QASM strings (more data sent to cloud
backends); lower values trade resolution for compactness.
Defaults to :data:`~divi.circuits.DEFAULT_PRECISION`.
decode_solution_fn: Function to decode bitstrings
into problem-specific solution representations. Called during final computation
and when `get_top_solutions(include_decoded=True)` is used. The function should
take a binary string (e.g., "0101") and return a decoded representation
(e.g., a list of indices, numpy array, or custom object). Defaults to
`lambda bitstring: bitstring` (identity function).
"""
program_id = kwargs.pop("program_id", None)
decode_solution_fn = kwargs.pop(
"decode_solution_fn", lambda bitstring: bitstring
)
super().__init__(
backend=backend,
seed=seed,
progress_queue=progress_queue,
program_id=program_id,
**kwargs,
)
# --- Optimization Results & History ---
self._losses_history = []
self._param_history: list[npt.NDArray[np.float64]] = []
self._best_params = np.array([], dtype=np.float64)
self._final_params = np.array([], dtype=np.float64)
self._best_loss = float("inf")
self._best_probs = {}
self.optimize_result: OptimizeResult | None = None
"""Raw result object returned by the underlying optimizer, or ``None``
before :meth:`run` is called.
Always populated after :meth:`run` completes. When optimization
converges normally, ``success`` is ``True``. When early stopping
or cancellation terminates the run, ``success`` is ``False`` and the
``message`` field describes the reason.
See :class:`scipy.optimize.OptimizeResult` for the full specification.
"""
# --- Random Number Generation ---
self._rng = np.random.default_rng(self._seed)
# --- Optimizer Configuration ---
self.optimizer = optimizer if optimizer is not None else MonteCarloOptimizer()
# --- Early Stopping ---
self._early_stopping = early_stopping
self._stop_reason: StopReason | None = None
# --- Solution Decoding ---
self._decode_solution_fn = decode_solution_fn
# --- Circuit Factory & Templates ---
self._meta_circuit_factories = None
def _has_run_optimization(self) -> bool:
"""Check if optimization has been run at least once.
Returns:
bool: True if optimization has been run, False otherwise.
"""
return len(self._losses_history) > 0
[docs]
def has_results(self) -> bool:
return self._has_run_optimization()
@property
def stop_reason(self) -> StopReason | None:
"""Reason the optimization was stopped early, or ``None``.
Returns:
StopReason | None: The :class:`~divi.qprog.early_stopping.StopReason`
that triggered early stopping, or ``None`` if optimization
completed normally or has not been run yet.
"""
return self._stop_reason
@property
def losses_history(self) -> list[dict]:
"""Get a copy of the optimization loss history.
Each entry is a dictionary mapping parameter indices to loss values.
Returns:
list[dict]: Copy of the loss history. Modifications to this list
will not affect the internal state.
"""
if not self._has_run_optimization():
warn(
"losses_history is empty. Optimization has not been run yet. "
f"{_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return self._losses_history.copy()
[docs]
def param_history(
self,
mode: ParamHistoryMode = "all_evaluated",
) -> list[npt.NDArray[np.float64]]:
"""Parameter vectors recorded at each optimization callback.
Args:
mode: Which rows to return for each iteration:
* ``"all_evaluated"`` — full batch from the callback, shape
``(n_param_sets, n_params)`` per iteration (mirrors
:attr:`losses_history` population layout).
* ``"best_per_iteration"`` — single best member by loss for
that iteration, shape ``(1, n_params)`` per iteration.
Returns:
list[npt.NDArray[np.float64]]: One array per completed callback.
Use ``numpy.vstack(...)`` for a 2D sample matrix (e.g. PCA).
Raises:
RuntimeError: If internal loss and parameter histories are out of sync.
"""
if not self._has_run_optimization():
warn(
"Parameter history is unavailable because optimization has not "
f"been run yet. {_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return []
if mode == "all_evaluated":
return [row.copy() for row in self._param_history]
if len(self._losses_history) != len(self._param_history):
raise RuntimeError(
"losses_history and _param_history length mismatch; cannot select "
"best_per_iteration rows."
)
best_blocks: list[npt.NDArray[np.float64]] = []
for loss_dict, block in zip(
self._losses_history, self._param_history, strict=True
):
arr = np.atleast_2d(np.asarray(block, dtype=np.float64))
n_rows = arr.shape[0]
best_idx = min(
range(n_rows),
key=lambda j: float(loss_dict[str(j)]),
)
best_blocks.append(arr[best_idx : best_idx + 1].copy())
return best_blocks
@property
def min_losses_per_iteration(self) -> list[float]:
"""Get the minimum loss value for each iteration.
Returns a list where each element is the minimum (best) loss value
across all parameter sets for that iteration.
Returns:
list[float]: List of minimum loss values, one per iteration.
"""
if not self._has_run_optimization():
warn(
"min_losses_per_iteration is empty. Optimization has not been run yet. "
f"{_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return [min(loss_dict.values()) for loss_dict in self._losses_history]
@property
def final_params(self) -> npt.NDArray[np.float64]:
"""Get a copy of the final optimized parameters.
Returns:
npt.NDArray[np.float64]: Copy of the final parameters. Modifications to this array
will not affect the internal state.
"""
if len(self._final_params) == 0 or not self._has_run_optimization():
warn(
"final_params is not available. Optimization has not been run yet. "
f"{_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return self._final_params.copy()
@property
def best_params(self) -> npt.NDArray[np.float64]:
"""Get a copy of the parameters that achieved the best (lowest) loss.
Returns:
npt.NDArray[np.float64]: Copy of the best parameters. Modifications to this array
will not affect the internal state.
"""
if len(self._best_params) == 0 or not self._has_run_optimization():
warn(
"best_params is not available. Optimization has not been run yet. "
f"{_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return self._best_params.copy()
@property
def best_loss(self) -> float:
"""Get the best loss achieved so far.
Returns:
float: The best loss achieved so far.
"""
if not self._has_run_optimization():
warn(
"best_loss has not been computed yet. Optimization has not been run. "
f"{_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
elif self._best_loss == float("inf"):
# Defensive check: if optimization ran but best_loss is still inf, something is wrong
raise RuntimeError(
"best_loss is still infinite after optimization. This indicates a problem "
"with the optimization process. The optimization callback may not have executed "
"correctly, or all computed losses were infinite."
)
return self._best_loss
@property
def viz(self):
"""Access visualization helpers for this variational program.
The returned object exposes a thin convenience wrapper over the
standalone :mod:`divi.viz` API, so scans can be written either as
``divi.viz.scan_1d(program, ...)`` or ``program.viz.scan_1d(...)``.
Returns:
ProgramViz: Convenience wrapper bound to this program instance.
"""
return ProgramViz(self)
@property
def best_probs(self) -> dict[str, dict[str, float]]:
"""Get normalized probabilities for the best parameters.
This property provides access to the probability distribution computed
by running measurement circuits with the best parameters found during
optimization. The distribution maps bitstrings (computational basis states)
to their measured probabilities.
The probabilities are normalized and have deterministic ordering when
iterated (dictionary insertion order is preserved in Python 3.7+).
Returns:
dict[str, dict[str, float]]: Dictionary mapping parameter-set keys to
bitstring probability dictionaries. Bitstrings are binary strings
(e.g., "0101"), values are probabilities in range [0.0, 1.0].
Returns an empty dict if final computation has not been performed.
Raises:
RuntimeError: If attempting to access probabilities before running
the algorithm with final computation enabled.
Note:
To populate this distribution, you must run the algorithm with
`perform_final_computation=True` (the default):
>>> program.run(perform_final_computation=True)
>>> probs = program.best_probs
Example:
>>> program.run()
>>> probs = program.best_probs
>>> for bitstring, prob in probs.items():
... print(f"{bitstring}: {prob:.2%}")
0101: 42.50%
1010: 31.20%
...
"""
if not self._best_probs:
warn(
"best_probs is empty. Either optimization has not been run yet, "
f"or final computation was not performed. {_RUN_INSTRUCTION}",
UserWarning,
stacklevel=2,
)
return self._best_probs.copy()
[docs]
def get_top_solutions(
self, n: int = 10, *, min_prob: float = 0.0, include_decoded: bool = False
) -> list[SolutionEntry]:
"""Get the top-N solutions sorted by probability.
This method extracts the most probable solutions from the measured
probability distribution. Solutions are sorted by probability (descending)
with deterministic tie-breaking using lexicographic ordering of bitstrings.
Args:
n (int): Maximum number of solutions to return. Must be non-negative.
If n is 0 or negative, returns an empty list. If n exceeds the
number of available solutions (after filtering), returns all
available solutions. Defaults to 10.
min_prob (float): Minimum probability threshold for including solutions.
Only solutions with probability >= min_prob will be included.
Must be in range [0.0, 1.0]. Defaults to 0.0 (no filtering).
include_decoded (bool): Whether to populate the `decoded` field of
each SolutionEntry by calling the `decode_solution_fn` provided
in the constructor. If False, the decoded field will be None.
Defaults to False.
Returns:
list[SolutionEntry]: List of solution entries sorted by probability
(descending), then by bitstring (lexicographically ascending)
for deterministic tie-breaking. Returns an empty list if no
probability distribution is available or n <= 0.
Raises:
RuntimeError: If probability distribution is not available because
optimization has not been run or final computation was not performed.
ValueError: If min_prob is not in range [0.0, 1.0] or n is negative.
Note:
The probability distribution must be computed by running the algorithm
with `perform_final_computation=True` (the default):
>>> program.run(perform_final_computation=True)
>>> top_10 = program.get_top_solutions(n=10)
Example:
>>> # Get top 5 solutions with probability >= 5%
>>> program.run()
>>> solutions = program.get_top_solutions(n=5, min_prob=0.05)
>>> for sol in solutions:
... print(f"{sol.bitstring}: {sol.prob:.2%}")
1010: 42.50%
0101: 31.20%
1100: 15.30%
0011: 8.50%
1111: 2.50%
>>> # Get solutions with decoding
>>> solutions = program.get_top_solutions(n=3, include_decoded=True)
>>> for sol in solutions:
... print(f"{sol.bitstring} -> {sol.decoded}")
1010 -> [0, 2]
0101 -> [1, 3]
...
"""
# Validate inputs
if n < 0:
raise ValueError(f"n must be non-negative, got {n}")
if not (0.0 <= min_prob <= 1.0):
raise ValueError(f"min_prob must be in range [0.0, 1.0], got {min_prob}")
# Handle edge case: n == 0
if n == 0:
return []
# Require probability distribution to exist
if not self._best_probs:
raise RuntimeError(
"No probability distribution available. The final computation step "
"must be performed to compute the probability distribution. "
"Call run(perform_final_computation=True) to execute optimization "
"and compute the distribution."
)
# Extract the probability distribution (nested by parameter set)
# _best_probs structure: {tag: {bitstring: prob}}
probs_dict = next(iter(self._best_probs.values()))
# Filter by minimum probability and get top n sorted by probability (descending),
# then bitstring (ascending) for deterministic tie-breaking
top_items = sorted(
filter(
lambda bitstring_prob: bitstring_prob[1] >= min_prob, probs_dict.items()
),
key=lambda bitstring_prob: (-bitstring_prob[1], bitstring_prob[0]),
)[:n]
# Build result list (decode on demand)
return [
SolutionEntry(
bitstring=bitstring,
prob=prob,
decoded=(
self._decode_solution_fn(bitstring) if include_decoded else None
),
)
for bitstring, prob in top_items
]
# --- Serialization Adapters (For Pydantic) ---
@property
def _serialized_program_type(self) -> str:
return type(self).__name__
@property
def _serialized_rng_state(self) -> bytes:
return pickle.dumps(self._rng.bit_generator.state)
@property
def _serialized_optimizer_config(self) -> OptimizerConfig:
config_dict = self.optimizer.get_config()
return OptimizerConfig(type=config_dict.pop("type"), config=config_dict)
@property
def _serialized_subclass_state(self) -> SubclassState:
return SubclassState(data=self._save_subclass_state())
@property
def _serialized_stop_reason(self) -> str | None:
return self._stop_reason.value if self._stop_reason is not None else None
@property
def meta_circuit_factories(self) -> dict[str, MetaCircuit]:
"""Get the meta-circuit factories used by this program.
Returns:
dict[str, MetaCircuit]: Dictionary mapping circuit names to their
MetaCircuit factories.
"""
# Lazy initialization: each instance has its own _meta_circuit_factories.
# Note: When used with ProgramEnsemble, meta_circuit_factories is initialized sequentially
# in the main thread before parallel execution to avoid thread-safety issues.
if self._meta_circuit_factories is None:
self._meta_circuit_factories = self._create_meta_circuit_factories()
return self._meta_circuit_factories
@abstractmethod
def _create_meta_circuit_factories(self) -> dict[str, MetaCircuit]:
pass
@property
@abstractmethod
def n_params_per_layer(self) -> int:
"""Number of trainable parameters per ansatz layer.
Used by the base class to compute the total parameter count as
``n_layers * n_params_per_layer``.
"""
def _save_subclass_state(self) -> dict[str, Any]:
"""Hook method for subclasses to save additional state.
Override to return a dictionary of state variables that should be
included in the checkpoint. Default returns an empty dict.
Returns:
dict[str, Any]: Dictionary of subclass-specific state.
"""
return {}
def _load_subclass_state(self, state: dict[str, Any]) -> None:
"""Hook method for subclasses to load additional state.
Override to restore state variables from the checkpoint dictionary.
Default is a no-op.
Args:
state (dict[str, Any]): Dictionary of subclass-specific state.
"""
def _get_optimizer_config(self) -> OptimizerConfig:
"""Extract optimizer configuration for checkpoint reconstruction.
Returns:
OptimizerConfig: Configuration object for the current optimizer.
Raises:
NotImplementedError: If the optimizer does not support state saving.
"""
config_dict = self.optimizer.get_config()
return OptimizerConfig(
type=config_dict.pop("type"),
config=config_dict,
)
[docs]
def save_state(self, checkpoint_config: CheckpointConfig) -> Path:
"""Save the program state to a checkpoint directory."""
if self.current_iteration == 0 and len(self._losses_history) == 0:
raise RuntimeError("Cannot save checkpoint: optimization has not been run.")
if checkpoint_config.checkpoint_dir is None:
raise ValueError(
"checkpoint_config.checkpoint_dir must be a non-None Path."
)
main_dir = _ensure_checkpoint_dir(checkpoint_config.checkpoint_dir)
checkpoint_path = _get_checkpoint_subdir_path(main_dir, self.current_iteration)
checkpoint_path.mkdir(parents=True, exist_ok=True)
# 1. Save optimizer
self.optimizer.save_state(checkpoint_path)
# 2. Save Program State (Pydantic pulls data via validation_aliases)
state = ProgramState.model_validate(self)
state_file = checkpoint_path / PROGRAM_STATE_FILE
_atomic_write(state_file, state.model_dump_json(indent=2))
return checkpoint_path
[docs]
@classmethod
def load_state(
cls,
checkpoint_dir: Path | str,
backend: CircuitRunner,
subdirectory: str | None = None,
**kwargs,
) -> "VariationalQuantumAlgorithm":
"""Load program state from a checkpoint directory."""
checkpoint_path = resolve_checkpoint_path(checkpoint_dir, subdirectory)
state_file = checkpoint_path / PROGRAM_STATE_FILE
# 1. Load Pydantic Model
state = _load_and_validate_pydantic_model(
state_file,
ProgramState,
required_fields=["program_type", "current_iteration"],
)
# 2. Reconstruct Optimizer
opt_config = state.optimizer_config
if opt_config.type == "MonteCarloOptimizer":
optimizer = MonteCarloOptimizer.load_state(checkpoint_path)
elif opt_config.type == "PymooOptimizer":
optimizer = PymooOptimizer.load_state(checkpoint_path)
else:
raise ValueError(f"Unsupported optimizer type: {opt_config.type}")
# 3. Create Instance
program = cls(backend=backend, optimizer=optimizer, seed=state.seed, **kwargs)
# 4. Restore State
state.restore(program)
return program
[docs]
def get_expected_param_shape(self) -> tuple[int, int]:
"""
Get the expected shape for initial parameters.
Returns:
tuple[int, int]: Shape (n_param_sets, n_layers * n_params_per_layer) that
initial parameters should have for this quantum program.
"""
return (self.optimizer.n_param_sets, self.n_layers * self.n_params_per_layer)
def _validate_initial_params(self, params: npt.NDArray[np.float64]):
"""
Validate user-provided initial parameters.
Args:
params (npt.NDArray[np.float64]): Parameters to validate.
Raises:
ValueError: If parameters have incorrect shape.
"""
expected_shape = self.get_expected_param_shape()
if params.shape != expected_shape:
raise ValueError(
f"Initial parameters must have shape {expected_shape}, "
f"got {params.shape}"
)
def _initialize_param_sets(self) -> npt.NDArray[np.float64]:
"""Generate fresh parameter sets for a new optimization run."""
total_params = self.n_layers * self.n_params_per_layer
return self._rng.uniform(
0, 2 * np.pi, (self.optimizer.n_param_sets, total_params)
)
def _optimizer_has_resume_state(self) -> bool:
"""Return True when the optimizer already carries resumable state."""
if isinstance(self.optimizer, MonteCarloOptimizer):
return self.optimizer._has_checkpoint
if isinstance(self.optimizer, PymooOptimizer):
return self.optimizer._curr_algorithm_obj is not None
return False
def _resolve_initial_param_sets(
self, initial_params: npt.NDArray[np.float64] | None
) -> npt.NDArray[np.float64] | None:
"""Resolve the initial parameter sets for a fresh or resumed run."""
if initial_params is not None and self._optimizer_has_resume_state():
raise ValueError(
"initial_params cannot be provided when resuming from optimizer state. "
"Load a fresh program instance or reset the optimizer first."
)
if initial_params is not None:
validated = np.atleast_2d(initial_params)
self._validate_initial_params(validated)
return validated.copy()
if self._optimizer_has_resume_state():
return None
return self._initialize_param_sets()
# ------------------------------------------------------------------ #
# Pipeline builders
# ------------------------------------------------------------------ #
@property
def _cost_pipeline(self) -> CircuitPipeline:
"""The cost-evaluation pipeline built by :meth:`_build_pipelines`."""
return self._pipelines["cost"]
@property
def _measurement_pipeline(self) -> CircuitPipeline | None:
"""The final-measurement pipeline, or ``None`` if this algorithm has none."""
return self._pipelines.get("measurement")
def _get_initial_spec(self, name: str) -> Any:
"""Resolve the initial spec for pipeline ``name`` (lazy).
Default VQA mapping:
* ``"cost"`` → ``meta_circuit_factories["cost_circuit"]``
* ``"measurement"`` → ``meta_circuit_factories["meas_circuit"]``,
falling back to ``"cost_circuit"``.
Subclasses override to change either entry (e.g. QAOA's cost
pipeline uses a Hamiltonian, not a MetaCircuit, as initial spec).
"""
factories = self.meta_circuit_factories
if name == "cost":
return factories["cost_circuit"]
if name == "measurement":
if "meas_circuit" in factories:
return factories["meas_circuit"]
return factories["cost_circuit"]
raise KeyError(f"No initial spec registered for pipeline {name!r}.")
def _build_pipeline_env(self, **overrides) -> PipelineEnv:
"""Construct a PipelineEnv for the provided parameter sets."""
if "param_sets" not in overrides:
overrides["param_sets"] = self._initialize_param_sets()
if "rng" not in overrides:
overrides["rng"] = self._rng
return super()._build_pipeline_env(**overrides)
@property
def _loss_constant_consumed(self) -> bool:
"""Whether a cost-pipeline component already folds ``loss_constant`` in.
When ``True``, :meth:`_evaluate_cost_param_sets` skips its post-reduction
add to avoid double-counting. ``False`` for vanilla VQE/QAOA/CustomVQA;
data-binding subclasses override it.
"""
return False
def _build_cost_pipeline(
self,
spec_stage: Stage,
extra_stages: tuple[Stage, ...] = (),
) -> CircuitPipeline:
"""Build the cost-evaluation pipeline.
Default ordering: spec → [extra_stages] → QEM (→ PauliTwirl)
→ Measurement → ParamBinding.
ParameterBinding is placed last because it is a cheap string-template
operation, whereas QEM/PauliTwirl/Measurement are expensive structural
transforms that are independent of parameter values. Binding last
avoids redundantly repeating structural work for each parameter set.
When ``QuEPP(bind_before_mitigation=True)`` is set, binding moves
before QEM so that QuEPP receives concrete angles and can normalize
rotations — producing fewer Pauli paths at the cost of repeating
structural work per parameter set.
Args:
spec_stage: A SpecStage producing MetaCircuit(s) from the
cost Hamiltonian (e.g. TrotterSpecStage).
extra_stages: Subclass-supplied stages to insert *between* the
spec and the (optional) early-bind ParameterBindingStage.
Data-binding subclasses (QNN, CustomVQA) inject
:class:`~divi.pipeline.stages.DataBindingStage` here so the
data axis fans out before QEM/twirling sees it.
"""
bind_early = getattr(self._qem_protocol, "bind_before_mitigation", False)
stages: list[Stage] = [spec_stage, *extra_stages]
if bind_early:
stages.append(ParameterBindingStage())
stages.append(QEMStage(protocol=self._qem_protocol))
n_twirls = getattr(self._qem_protocol, "n_twirls", 0)
if n_twirls > 0:
stages.append(PauliTwirlStage(n_twirls=n_twirls))
stages.append(
MeasurementStage(
grouping_strategy=self._grouping_strategy,
shot_distribution=self._shot_distribution,
)
)
if not bind_early:
stages.append(ParameterBindingStage())
return CircuitPipeline(stages=stages)
def _build_measurement_pipeline(self) -> CircuitPipeline:
"""Build the measurement pipeline for solution extraction.
Stages: SingleCircuitSpec → Measurement → ParameterBinding.
Note: QEM is intentionally excluded — ZNE error mitigation applies
only to cost evaluation (expectation values), not probability extraction.
"""
return CircuitPipeline(
stages=[
CircuitSpecStage(),
MeasurementStage(),
ParameterBindingStage(),
]
)
# ------------------------------------------------------------------ #
# Execution
# ------------------------------------------------------------------ #
def _evaluate_cost_param_sets(
self, param_sets: npt.NDArray[np.float64], **kwargs
) -> dict[int, float]:
"""Evaluate the cost pipeline for the provided parameter sets.
Subclasses should prefer overriding the initial-spec hook over
replacing the full evaluator.
"""
normalized_param_sets = np.atleast_2d(param_sets)
env = self._build_pipeline_env(param_sets=normalized_param_sets)
result = self._cost_pipeline.run(
initial_spec=self._get_initial_spec("cost"),
env=env,
)
self._total_circuit_count += env.artifacts.get("circuit_count", 0)
self._total_run_time += env.artifacts.get("run_time", 0.0)
self._current_execution_result = env.artifacts.get("_current_execution_result")
constant = 0.0 if self._loss_constant_consumed else self.loss_constant
indexed = {
_extract_param_set_idx(key): float(value[0]) + constant
for key, value in result.items()
}
return dict(sorted(indexed.items()))
def _evaluate_gradient_at(
self, params: npt.NDArray[np.float64], **kwargs
) -> npt.NDArray[np.float64]:
"""Evaluate the parameter-shift gradient at a single parameter vector."""
shifted_param_sets = self._grad_shift_mask + params
exp_vals = self._evaluate_cost_param_sets(shifted_param_sets, **kwargs)
exp_vals_arr = np.asarray(
[value for value in exp_vals.values()],
dtype=np.float64,
)
pos_shifts = exp_vals_arr[::2]
neg_shifts = exp_vals_arr[1::2]
return 0.5 * (pos_shifts - neg_shifts)
[docs]
def run(
self,
initial_params: npt.NDArray[np.float64] | None = None,
perform_final_computation: bool = True,
checkpoint_config: CheckpointConfig | None = None,
**kwargs,
) -> "VariationalQuantumAlgorithm":
"""Run the variational quantum algorithm.
The outputs are stored in the algorithm object and can be accessed via
properties such as ``total_circuit_count``, ``total_run_time``,
``losses_history``, and ``best_params``.
Args:
initial_params (npt.NDArray[np.float64] | None): Optional initial parameter
sets for a fresh optimization run. Must have shape
``(n_param_sets, n_layers * n_params_per_layer)``. Cannot be
combined with a checkpoint-resumed optimizer state.
perform_final_computation (bool): Whether to perform final computation after optimization completes.
Typically, this step involves sampling with the best found parameters to extract
solution probability distributions. Set this to False in warm-starting or pre-training
routines where the final sampling step is not needed. Defaults to True.
checkpoint_config (CheckpointConfig | None): Checkpoint configuration.
If None, no checkpointing is performed.
**kwargs: Additional keyword arguments for subclasses.
Returns:
VariationalQuantumAlgorithm: Returns ``self`` for method chaining.
"""
# Initialize checkpointing
if checkpoint_config is None:
checkpoint_config = CheckpointConfig()
if checkpoint_config.checkpoint_dir:
logger.info(
f"Using checkpoint directory: {checkpoint_config.checkpoint_dir}"
)
# Extract max_iterations from kwargs if present (for compatibility with subclasses)
max_iterations = kwargs.pop("max_iterations", self.max_iterations)
if max_iterations != self.max_iterations:
self.max_iterations = max_iterations
if self.max_iterations <= self.current_iteration:
warn(
f"max_iterations ({self.max_iterations}) is less than or equal to "
f"current_iteration ({self.current_iteration}). The optimization will "
f"not run additional iterations since the maximum has already been "
f"reached.",
UserWarning,
)
def cost_fn(params):
self.reporter.info(
message="💸 Computing Cost 💸", iteration=self.current_iteration
)
losses = self._evaluate_cost_param_sets(np.atleast_2d(params), **kwargs)
losses = np.asarray(
[value for value in losses.values()],
dtype=np.float64,
)
if params.ndim > 1:
return losses
else:
return losses.item()
self._grad_shift_mask = _compute_parameter_shift_mask(
self.n_layers * self.n_params_per_layer
)
last_grad_norm: float | None = None
def grad_fn(params):
nonlocal last_grad_norm
self.reporter.info(
message="📈 Computing Gradients 📈", iteration=self.current_iteration
)
grads = self._evaluate_gradient_at(params, **kwargs)
last_grad_norm = float(np.linalg.norm(grads))
return grads
def _iteration_counter(intermediate_result: OptimizeResult):
self._losses_history.append(
dict(
zip(
[str(i) for i in range(len(intermediate_result.x))],
intermediate_result.fun,
)
)
)
self._param_history.append(
np.atleast_2d(
np.asarray(intermediate_result.x, dtype=np.float64)
).copy()
)
current_loss = np.min(intermediate_result.fun)
if current_loss < self._best_loss:
self._best_loss = current_loss
best_idx = np.argmin(intermediate_result.fun)
self._best_params = intermediate_result.x[best_idx].copy()
self.current_iteration += 1
self.reporter.update(
iteration=self.current_iteration, loss=float(current_loss)
)
# Checkpointing
if checkpoint_config._should_checkpoint(self.current_iteration):
self.save_state(checkpoint_config)
if self._cancellation_event and self._cancellation_event.is_set():
raise ExecutionCancelledError("Cancellation requested by batch.")
# --- Early stopping ---
if self._early_stopping is not None:
reason = self._early_stopping.check(
current_loss,
grad_norm=last_grad_norm,
)
if reason is not None:
self._stop_reason = reason
self.reporter.info(
message=f"Early stopping triggered: {reason.value}",
iteration=self.current_iteration,
)
raise StopIteration
# The scipy implementation of COBYLA interprets the `maxiter` option
# as the maximum number of function evaluations, not iterations.
# To provide a consistent user experience, we disable `scipy`'s
# `maxiter` and manually stop the optimization from the callback
# when the desired number of iterations is reached.
if (
isinstance(self.optimizer, ScipyOptimizer)
and self.optimizer.method == ScipyMethod.COBYLA
and intermediate_result.nit + 1 == self.max_iterations
):
raise StopIteration
self.reporter.info(message="Finished Setup")
resolved_initial_params = self._resolve_initial_param_sets(initial_params)
with self._install_cancellation_handler():
try:
self.optimize_result = self.optimizer.optimize(
cost_fn=cost_fn,
initial_params=resolved_initial_params,
callback_fn=_iteration_counter,
jac=grad_fn,
max_iterations=self.max_iterations,
rng=self._rng,
)
except StopIteration:
reason = self._stop_reason.value if self._stop_reason else "Stopped"
self.optimize_result = OptimizeResult(
x=np.atleast_2d(self._best_params),
fun=np.atleast_1d(self._best_loss),
nit=self.current_iteration,
success=False,
message=f"Early stopping: {reason}",
)
except ExecutionCancelledError as exc:
# ``KeyboardInterrupt`` is deliberately NOT caught here:
# the second Ctrl+C re-raises ``KeyboardInterrupt`` from
# the signal handler as the documented hard-abort path,
# and intercepting it would defeat that.
message = "Cancelled by user"
self.optimize_result = OptimizeResult(
x=np.atleast_2d(self._best_params),
fun=np.atleast_1d(self._best_loss),
nit=self.current_iteration,
success=False,
message=message,
)
# The pipeline already best-effort-cancelled the in-flight
# job when it raised; no redundant call needed here.
self.reporter.info(
message=message, final_status=TerminalStatus.CANCELLED
)
raise ExecutionCancelledError(message) from exc
else:
self.optimize_result.success = True
self.optimize_result.message = "Optimization converged."
# Set _best_params from final result (source of truth)
x = np.atleast_2d(self.optimize_result.x)
self._best_params = x[np.argmin(self.optimize_result.fun)].copy()
self._final_params = self.optimize_result.x
if perform_final_computation:
self.sample_solution(**kwargs)
self.reporter.info(
message="Finished successfully!", final_status=TerminalStatus.SUCCESS
)
return self
[docs]
def sample_solution(
self,
params: npt.NDArray[np.float64] | None = None,
**kwargs,
) -> "VariationalQuantumAlgorithm":
"""Run the final measurement and decode the solution.
Called by ``run()`` (with ``params=None``, falling back to
``self._best_params``) after optimization completes. It can also
be called directly with externally-provided ``params`` when you
already have trained parameters (e.g. from a prior ``run()``,
a checkpoint, or external training) and only need to sample the
circuit — skipping the EXPECTATION jobs that ``run()`` would
otherwise dispatch during optimization.
When called with explicit ``params``, this method does NOT mutate
``self._best_params`` or any optimizer state (``optimize_result``,
``_losses_history``, ``_param_history``, ``current_iteration``).
Only the measurement-side attributes are updated: ``_best_probs``,
``_total_circuit_count``, ``_total_run_time``, and subclass-specific
solution fields (e.g. ``solution_bitstring`` for QAOA, ``_eigenstate``
for VQE).
Args:
params: Optional parameter set to evaluate. Must have shape
``(n_layers * n_params_per_layer,)`` for a single set or
``(n_param_sets, n_layers * n_params_per_layer)`` for a
batch. When ``None`` (the default), uses ``self._best_params``.
**kwargs: Subclass-specific keyword arguments.
Returns:
VariationalQuantumAlgorithm: Returns ``self`` for method chaining.
Raises:
ValueError: If ``params`` does not have the expected number of
parameters per set.
RuntimeError: If ``params=None`` and ``self._best_params`` is
empty (i.e. ``run()`` has not been called yet).
Note:
Subclasses override this method to add their algorithm-specific
decoding step. They should call ``super().sample_solution(params)``
to perform parameter validation and the measurement-pipeline
dispatch, then read from ``self._best_probs`` to extract
algorithm-specific solution state.
"""
if self._measurement_pipeline is None:
# Algorithm has no measurement pipeline (e.g. CustomVQA) — there is
# nothing to sample. Subclasses that need a final step override
# this method.
return self
if params is None:
if len(self._best_params) == 0:
raise RuntimeError(
"sample_solution() was called without explicit `params` "
"but no trained parameters are available. Either pass "
"`params=...` or call run() first."
)
params_arr = self._best_params
else:
params_arr = np.asarray(params, dtype=np.float64)
expected = self.n_layers * self.n_params_per_layer
if params_arr.shape[-1] != expected:
raise ValueError(
f"params last-axis size ({params_arr.shape[-1]}) does not "
f"match n_layers * n_params_per_layer ({expected})."
)
self._run_solution_measurement_for(np.atleast_2d(params_arr))
return self
def _run_solution_measurement_for(
self, param_sets: npt.NDArray[np.float64]
) -> None:
"""Execute measurement circuits via the pipeline for the provided parameter sets."""
if (pipeline := self._measurement_pipeline) is None:
raise RuntimeError(
"This algorithm does not have a measurement pipeline; "
"_run_solution_measurement_for cannot be called."
)
env = self._build_pipeline_env(param_sets=np.atleast_2d(param_sets))
result = pipeline.run(
initial_spec=self._get_initial_spec("measurement"),
env=env,
)
self._total_circuit_count += env.artifacts.get("circuit_count", 0)
self._total_run_time += env.artifacts.get("run_time", 0.0)
self._current_execution_result = env.artifacts.get("_current_execution_result")
indexed = {_extract_param_set_idx(key): value for key, value in result.items()}
self._best_probs = dict(sorted(indexed.items()))