# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0
from collections.abc import Sequence
from typing import Any
import numpy as np
from qiskit import transpile
from qiskit.circuit import Parameter, QuantumCircuit
from qiskit.circuit.library import PauliEvolutionGate
from qiskit.converters import circuit_to_dag
from qiskit.quantum_info import SparsePauliOp
from qiskit.synthesis import LieTrotter, SuzukiTrotter
from divi.circuits import MetaCircuit
from divi.circuits._conversions import _QISKIT_TO_QASM2
from divi.circuits.qem import _NoMitigation
from divi.hamiltonians import (
ExactTrotterization,
TrotterizationStrategy,
)
from divi.hamiltonians._term_ops import (
_clean_hamiltonian_spo,
_require_qiskit_num_qubits,
_spo_to_qiskit_basis_gates,
to_spo,
)
from divi.pipeline import CircuitPipeline
from divi.pipeline.stages import (
MeasurementStage,
ParameterBindingStage,
PauliTwirlStage,
QEMStage,
TrotterSpecStage,
)
from divi.qprog import ObservableMeasuringMixin
from divi.qprog.algorithms import InitialState, ZerosState
from divi.qprog.quantum_program import QuantumProgram
from divi.reporting import TerminalStatus
[docs]
class TimeEvolution(ObservableMeasuringMixin, QuantumProgram):
"""Quantum program for Hamiltonian time evolution.
Simulates the evolution of a quantum state under a Hamiltonian using
Trotter-Suzuki decomposition. Uses Divi's TrotterizationStrategy
(``ExactTrotterization``, ``QDrift``) for term selection and approximation.
"""
def __init__(
self,
hamiltonian: SparsePauliOp,
trotterization_strategy: TrotterizationStrategy | None = None,
time: float = 1.0,
n_steps: int = 1,
order: int = 1,
initial_state: InitialState | None = None,
observable: SparsePauliOp | Sequence[SparsePauliOp] | None = None,
_template_meta: MetaCircuit | None = None,
_template_param: Parameter | None = None,
**kwargs,
):
"""Initialize TimeEvolution.
Args:
hamiltonian: Hamiltonian to evolve under. Accepts anything
:func:`~divi.hamiltonians.to_spo` consumes (``SparsePauliOp``,
PennyLane operator, or a divi-convention Pauli-string dict).
trotterization_strategy: Strategy for term selection (``ExactTrotterization``, ``QDrift``).
Defaults to ExactTrotterization().
time: Evolution time t (e^(-iHt)).
n_steps: Number of Trotter steps.
order: Suzuki-Trotter order (1 or even).
initial_state: Initial state preparation. Pass an :class:`~divi.qprog.algorithms.InitialState`
instance (e.g. ``ZerosState()``, ``SuperpositionState()``).
Defaults to ``ZerosState()`` if None.
observable: One of:
* ``None`` — measure computational-basis probabilities over
all qubits.
* Single observable accepted by
:func:`~divi.hamiltonians.to_spo` — one expectation-value
measurement; ``self.results`` is a ``float``.
* Sequence of such observables — multiple expectation-value
measurements from the same circuit; ``self.results`` is a
``list[float]`` (one mitigated value per observable).
Commuting observables are measured from a shared shot
batch via
:class:`~divi.pipeline.stages.MeasurementStage`'s QWC
grouping; QuEPP shares the target circuit and dedupes
path DAGs across observables.
**kwargs: Passed to QuantumProgram (backend, seed, progress_queue, etc.).
Accepts ``qem_protocol`` for quantum error mitigation (requires
``observable`` to be set, since QEM operates on expectation values).
"""
super().__init__(**kwargs)
if not isinstance(n_steps, int) or n_steps < 1:
raise ValueError(f"n_steps must be a positive integer, got {n_steps!r}.")
if order != 1 and (not isinstance(order, int) or order < 2 or order % 2 != 0):
raise ValueError(f"order must be 1 or an even integer >= 2, got {order!r}.")
if trotterization_strategy is None:
trotterization_strategy = ExactTrotterization()
hamiltonian_spo = to_spo(hamiltonian)
hamiltonian_clean, _ = _clean_hamiltonian_spo(hamiltonian_spo)
if hamiltonian_clean.size == 0:
raise ValueError("Hamiltonian contains only constant terms.")
self._hamiltonian = hamiltonian_clean
self.trotterization_strategy = trotterization_strategy
self.time = time
self.n_steps = n_steps
self.order = order
if isinstance(observable, Sequence) and not isinstance(observable, tuple):
observable = tuple(observable)
self.n_qubits = _require_qiskit_num_qubits(hamiltonian_clean.num_qubits)
self._circuit_wires = tuple(range(self.n_qubits))
# Normalise observables to SparsePauliOp at the input boundary, aligning
# qubit count with the cost circuit (a 1-qubit ``Z`` on qubit 0 against
# a multi-qubit evolution must lift to ``Z ⊗ I ⊗ … ⊗ I``).
self.observable = self._normalise_observable(observable)
if initial_state is None:
initial_state = ZerosState()
if not isinstance(initial_state, InitialState):
raise TypeError(
f"initial_state must be an InitialState instance, got {type(initial_state).__name__}"
)
self.initial_state = initial_state
if (_template_meta is None) != (_template_param is None):
missing = (
"_template_param" if _template_meta is not None else "_template_meta"
)
raise ValueError(
f"_template_meta and _template_param must be provided together; "
f"got {missing}=None."
)
self._template_meta = _template_meta
self._template_param = _template_param
self._results: dict[str, float] | float | list[float] | None = None
self._pipelines = self._build_pipelines()
[docs]
def has_results(self) -> bool:
return self._results is not None
@property
def results(self) -> dict[str, float] | float | list[float]:
"""Get the final results.
Returns one of:
* ``dict[str, float]`` — probability distribution when no
``observable`` was provided.
* ``float`` — expectation value for a single ``observable``.
* ``list[float]`` — per-observable expectation values when
``observable`` is a list/tuple.
Raises:
RuntimeError: If ``.run()`` has not yet been called.
"""
if self._results is None:
raise RuntimeError(
"TimeEvolution.results is not available. Call .run() first."
)
return self._results
[docs]
def probabilities(self) -> dict[str, float]:
"""Return probability-mode results.
Raises:
RuntimeError: If ``.run()`` has not yet been called, or if this
instance was constructed with an ``observable`` (expectation
value mode). Use :meth:`expval` instead.
"""
results = self.results
if not isinstance(results, dict):
raise RuntimeError(
"TimeEvolution was run in expectation-value mode; use "
".expval() instead of .probabilities()."
)
return results
[docs]
def expval(self) -> float | list[float]:
"""Return expectation-value-mode results.
Returns a ``float`` when ``observable`` was a single operator, or a
``list[float]`` (one entry per observable, in input order) when
``observable`` was a list/tuple.
Raises:
RuntimeError: If ``.run()`` has not yet been called, or if this
instance was constructed without an ``observable`` (probability
mode). Use :meth:`probabilities` instead.
"""
results = self.results
if isinstance(results, dict):
raise RuntimeError(
"TimeEvolution was run in probability mode; use "
".probabilities() instead of .expval()."
)
return results
def _build_pipelines(self) -> dict:
trotter = TrotterSpecStage(
trotterization_strategy=self.trotterization_strategy,
meta_circuit_factory=self._meta_circuit_factory,
)
stages: list = [trotter]
if not isinstance(self._qem_protocol, _NoMitigation):
stages.append(QEMStage(protocol=self._qem_protocol))
n_twirls = getattr(self._qem_protocol, "n_twirls", 0)
if n_twirls > 0:
stages.append(PauliTwirlStage(n_twirls=n_twirls))
stages.append(
MeasurementStage(
grouping_strategy=self._grouping_strategy,
shot_distribution=self._shot_distribution,
)
)
if self._template_meta is not None:
# ParameterBindingStage binds the trajectory template's
# ``t`` parameter to ``self.time`` via ``env.param_sets``.
stages.append(ParameterBindingStage())
return {"evolution": CircuitPipeline(stages=stages)}
def _build_pipeline_env(self, **overrides):
if self._template_meta is not None and "param_sets" not in overrides:
overrides["param_sets"] = np.array([[float(self.time)]])
return super()._build_pipeline_env(**overrides)
@property
def _pipeline(self) -> CircuitPipeline:
"""The evolution pipeline (thin accessor over ``self._pipelines``)."""
return self._pipelines["evolution"]
def _get_initial_spec(self, name: str) -> Any:
if name == "evolution":
return self._hamiltonian
raise KeyError(f"No initial spec registered for pipeline {name!r}.")
def _meta_circuit_factory(
self, processed_spo: SparsePauliOp, ham_id: int
) -> MetaCircuit:
"""Factory for TrotterSpecStage: build a MetaCircuit for one Hamiltonian sample (SPO)."""
if self._template_meta is not None:
return self._template_meta
qc = self._build_qiskit_circuit(processed_spo)
dag = circuit_to_dag(qc)
if self.observable is None:
return MetaCircuit(
circuit_bodies=(((), dag),),
measured_wires=tuple(range(self.n_qubits)),
precision=self._precision,
)
if isinstance(self.observable, tuple):
return MetaCircuit(
circuit_bodies=(((), dag),),
observable=self.observable,
precision=self._precision,
_was_multi_obs=True,
)
return MetaCircuit(
circuit_bodies=(((), dag),),
observable=(self.observable,),
precision=self._precision,
)
[docs]
def run(self, **kwargs) -> "TimeEvolution":
"""Execute time evolution.
Returns:
TimeEvolution: Returns ``self`` for method chaining.
"""
env = self._build_pipeline_env()
result = self._pipeline.run(initial_spec=self._hamiltonian, env=env)
self._total_circuit_count += env.artifacts.get("circuit_count", 0)
self._total_run_time += env.artifacts.get("run_time", 0.0)
self._current_execution_result = env.artifacts.get("_current_execution_result")
if len(result) != 1:
raise RuntimeError(
f"Expected exactly 1 pipeline result, got {len(result)}."
)
(raw,) = result.values()
if self.observable is None:
self._results = raw
elif isinstance(self.observable, tuple):
self._results = [float(v) for v in raw]
else:
(single,) = raw
self._results = float(single)
self.reporter.info(
message="Finished successfully!", final_status=TerminalStatus.SUCCESS
)
return self
def _normalise_observable(self, observable):
"""Convert observables to ``SparsePauliOp`` on the cost-circuit wires."""
if observable is None:
return None
if isinstance(observable, tuple):
return tuple(self._lift_observable(o) for o in observable)
return self._lift_observable(observable)
def _lift_observable(self, op) -> SparsePauliOp:
if isinstance(op, SparsePauliOp):
if op.num_qubits != self.n_qubits:
raise ValueError(
f"Observable has {op.num_qubits} qubits but the cost circuit "
f"has {self.n_qubits}."
)
return op
# Non-SPO input (PL operator, Pauli-string dict): lift onto the
# full wire register so a narrow observable matches the cost circuit.
return to_spo(op, wires=self._circuit_wires)
def _build_qiskit_circuit(self, processed_spo: SparsePauliOp) -> QuantumCircuit:
"""Build initial-state preparation + Trotter evolution as a ``QuantumCircuit``.
Adjoint evolution is realized via negative time. Single-term
Hamiltonians use positive time to preserve the standard
``exp(-i t H)`` sign convention even when ``H`` carries its own
coefficient sign.
QDrift sampled-term evolution uses
:func:`_spo_to_qiskit_basis_gates` directly so each sampled term
keeps its multiplicity. Standard Trotter uses
:class:`PauliEvolutionGate` with :class:`LieTrotter` /
:class:`SuzukiTrotter` synthesis; the circuit is then transpiled
down to the basis-gate set the QASM body emitter understands.
"""
qc = QuantumCircuit(self.n_qubits)
qc.compose(self.initial_state.build(self._circuit_wires), inplace=True)
qubits = list(range(self.n_qubits))
sampled_spo = self.trotterization_strategy.last_sampled_spo
if sampled_spo is not None:
# Faithful QDrift: one evolution gate per sampled term (preserving
# sampling-with-replacement multiplicities), repeated ``n_steps``
# times with ``time / n_steps`` per step. Adjoint via negative time.
step_time = -self.time / self.n_steps
for _ in range(self.n_steps):
_spo_to_qiskit_basis_gates(qc, sampled_spo, step_time, qubits)
return qc
# Standard Trotter-Suzuki for ExactTrotterization.
if processed_spo.size >= 2:
evolution_time = -self.time
synthesis = (
LieTrotter(reps=self.n_steps, preserve_order=True)
if self.order == 1
else SuzukiTrotter(
order=self.order, reps=self.n_steps, preserve_order=True
)
)
qc.append(
PauliEvolutionGate(
processed_spo, time=evolution_time, synthesis=synthesis
),
qubits,
)
# Lower to the gate set ``dag_to_qasm_body`` accepts. Qiskit's
# Trotter synthesis can emit ``rxx``/``ryy``/``rzz``-style compound
# rotations that the QASM2 emitter raises on; ``optimization_level=0``
# keeps it to a cheap gate-by-gate substitution.
try:
qc = transpile(
qc,
basis_gates=list(_QISKIT_TO_QASM2.keys()),
optimization_level=0,
)
except Exception as exc:
raise RuntimeError(
"TimeEvolution failed to lower the Trotter-synthesised "
"circuit to Divi's supported basis-gate set. This usually "
"means PauliEvolutionGate synthesis emitted a gate Divi's "
"QASM2 emitter does not handle. Supported gates: "
f"{sorted(_QISKIT_TO_QASM2.keys())}."
) from exc
else:
# Single-term Hamiltonian — positive-time convention.
_spo_to_qiskit_basis_gates(qc, processed_spo, self.time, qubits)
return qc