Source code for divi.hamiltonians._trotterization

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

"""Trotterization strategies for Hamiltonian simulation.

Strategies consume and return :class:`qiskit.quantum_info.SparsePauliOp`.
"""

from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Literal, Protocol
from warnings import warn

import numpy as np
from qiskit.quantum_info import SparsePauliOp

from divi.hamiltonians._term_ops import (
    _clean_hamiltonian_spo,
    _require_qiskit_num_qubits,
    _sort_hamiltonian_terms_spo,
    generate_empty_spo,
)

# Maximum number of distinct input SPOs cached per strategy instance.
_STRATEGY_CACHE_MAXSIZE = 8


class _LRUCache(OrderedDict):
    """``OrderedDict`` with a maxsize cap. Most-recent entry is at the end."""

    def __init__(self, maxsize: int = _STRATEGY_CACHE_MAXSIZE) -> None:
        super().__init__()
        self._maxsize = maxsize

    def __setitem__(self, key, value) -> None:  # type: ignore[override]
        if key in self:
            self.move_to_end(key)
        super().__setitem__(key, value)
        while len(self) > self._maxsize:
            self.popitem(last=False)

    def __getitem__(self, key):  # type: ignore[override]
        value = super().__getitem__(key)
        self.move_to_end(key)
        return value

    def get(self, key, default=None):  # type: ignore[override]
        if key in self:
            return self[key]
        return default


def _warn_truncation_no_op(
    keep_fraction: float | None, keep_top_n: int | None, n_terms: int
) -> bool:
    """Emit the truncation-is-a-no-op warning if applicable. Returns True when warned."""
    if keep_fraction is not None and keep_fraction == 1.0:
        warn(
            "keep_fraction is 1.0 (no truncation); returning the full Hamiltonian.",
            UserWarning,
        )
        return True
    if keep_top_n is not None and keep_top_n >= n_terms:
        warn(
            "keep_top_n is greater than or equal to the number of terms; "
            "returning the full Hamiltonian.",
            UserWarning,
        )
        return True
    return False


[docs] class TrotterizationStrategy(Protocol): """Trotterization strategy protocol.""" @property def stateful(self) -> bool: """True if the strategy retains state across ``process_hamiltonian`` calls. This should be true for strategies that might re-process the Hamiltonian during execution. """ ... @property def last_sampled_spo(self) -> SparsePauliOp | None: """SPO from the most recent sampling pass, or ``None`` when the strategy does not sample (e.g. :class:`ExactTrotterization`). Sampling strategies (e.g. :class:`QDrift`) populate this on every ``process_hamiltonian`` call with a faithful concatenation of the drawn terms — duplicates preserved so callers can emit one evolution gate per draw without recomputing multiplicities. """ ...
[docs] def process_hamiltonian(self, hamiltonian: SparsePauliOp) -> SparsePauliOp: """Trotterize the Hamiltonian (SPO in, SPO out).""" ...
[docs] @dataclass(frozen=True) class ExactTrotterization(TrotterizationStrategy): """Exact Trotterization strategy.""" keep_fraction: float | None = None """Fraction of terms to keep by coefficient magnitude (largest first). Must be in (0, 1]. If None, keep all terms.""" keep_top_n: int | None = None """Number of top terms to keep by coefficient magnitude. Must be >= 1. If None, keep all terms. Mutually exclusive with keep_fraction.""" # Bounded LRU cache of ``process_hamiltonian`` results keyed on ``id(spo)``. _cache: _LRUCache = field(default_factory=_LRUCache, compare=False, hash=False) def __post_init__(self): if self.keep_fraction is not None and self.keep_top_n is not None: raise ValueError( "At most one of keep_fraction or keep_top_n may be provided." ) if self.keep_fraction is not None and ( self.keep_fraction <= 0 or self.keep_fraction > 1 ): raise ValueError( f"keep_fraction must be in (0, 1], got {self.keep_fraction}" ) if self.keep_top_n is not None and ( not isinstance(self.keep_top_n, int) or self.keep_top_n <= 0 ): raise ValueError( f"keep_top_n must be a positive integer (>= 1), got {self.keep_top_n}" ) @property def stateful(self) -> bool: # Despite having a _cache, this strategy is stateless because it only # uses the cache as memoization, not as state. return False @property def last_sampled_spo(self) -> SparsePauliOp | None: # ExactTrotterization never samples; the strategy contract returns # ``None`` so callers can branch on the presence of sampled terms # without duck-typing. return None
[docs] def process_hamiltonian(self, hamiltonian: SparsePauliOp) -> SparsePauliOp: """Truncate the Hamiltonian to its top-magnitude terms.""" if self.keep_fraction is None and self.keep_top_n is None: return hamiltonian.simplify() # Cache keyed on ``id(spo)``. The stored tuple holds a strong # reference to the SPO so its id cannot be reused while cached. cache_key = id(hamiltonian) cached = self._cache.get(cache_key) if cached is not None: return cached[1] if _warn_truncation_no_op( self.keep_fraction, self.keep_top_n, hamiltonian.size ): return hamiltonian.simplify() result_spo = self._truncate_spo(hamiltonian) self._cache[cache_key] = (hamiltonian, result_spo) return result_spo
def _truncate_spo(self, spo: SparsePauliOp) -> SparsePauliOp: """Sort by |coeff|, slice to the kept tail, and re-attach the constant.""" non_id_spo, constant = _clean_hamiltonian_spo(spo, raise_on_constant=True) sorted_spo = _sort_hamiltonian_terms_spo(non_id_spo, order="magnitude") if self.keep_fraction is not None: absolute_coeffs = np.abs(sorted_spo.coeffs.real) target = absolute_coeffs.sum() * self.keep_fraction cumsum_from_end = np.cumsum(absolute_coeffs[::-1]) n_keep = np.searchsorted(cumsum_from_end, target, side="left") + 1 slice_idx = -min(n_keep, len(absolute_coeffs)) elif self.keep_top_n is not None: slice_idx = -self.keep_top_n else: raise RuntimeError( "keep_fraction and keep_top_n are both None; at least one must be set." ) kept_spo = SparsePauliOp( sorted_spo.paulis[slice_idx:], sorted_spo.coeffs[slice_idx:] ) if constant != 0: const_spo = SparsePauliOp.from_sparse_list( [("", [], constant)], num_qubits=_require_qiskit_num_qubits(spo.num_qubits), ) return (kept_spo + const_spo).simplify() return kept_spo.simplify()
[docs] @dataclass(frozen=True) class QDrift(TrotterizationStrategy): """``QDrift`` Trotterization strategy.""" keep_fraction: float | None = None """Fraction of terms to keep deterministically by coefficient magnitude (largest first). Must be in (0, 1]. If None, all terms go to the sampling pool. Mutually exclusive with keep_top_n.""" keep_top_n: int | None = None """Number of top terms to keep deterministically by coefficient magnitude. Must be >= 1. If None, all terms go to the sampling pool. Mutually exclusive with keep_fraction.""" sampling_budget: int | None = None """Number of terms to sample from the remaining Hamiltonian per cost evaluation. If None, only kept terms are applied (equivalent to ``ExactTrotterization``).""" sampling_strategy: Literal["uniform", "weighted"] = "uniform" """How to sample terms — ``"uniform"`` (equal probability) or ``"weighted"`` (by coefficient magnitude).""" seed: int | None = None """Random seed for reproducible sampling. If None, sampling is non-deterministic.""" n_hamiltonians_per_iteration: int = 10 """Number of Hamiltonian samples per cost evaluation; losses are averaged over them.""" # Caches the ``(keep_spo, to_sample_spo)`` split keyed on ``id(spo)``. # ``to_sample_spo is None`` means all terms were kept (no sampling). _cache: _LRUCache = field(default_factory=_LRUCache, compare=False, hash=False) _rng: np.random.Generator = field(init=False, compare=False, hash=False) # Sampled SPO from the most recent call, preserving duplicates from # sampling-with-replacement; concatenated with deterministically-kept terms. last_sampled_spo: SparsePauliOp | None = field( default=None, init=False, compare=False, hash=False ) def __post_init__(self): if ( self.keep_fraction is None and self.keep_top_n is None and self.sampling_budget is None ): warn( "Neither keep_fraction, keep_top_n, nor sampling_budget is set; " "the Hamiltonian will be returned unchanged.", UserWarning, ) elif self.sampling_budget is None: warn( "sampling_budget is not set; only the kept terms will be applied, " "equivalent to ExactTrotterization.", UserWarning, ) if self.sampling_strategy not in {"uniform", "weighted"}: raise ValueError( f"Invalid sampling_strategy: {self.sampling_strategy}. Must be 'uniform' or 'weighted'." ) if self.seed is not None and not isinstance(self.seed, int): raise ValueError(f"seed must be an integer, got {self.seed}") if self.n_hamiltonians_per_iteration < 1: raise ValueError( f"n_hamiltonians_per_iteration must be >= 1, got {self.n_hamiltonians_per_iteration}" ) object.__setattr__(self, "_rng", np.random.default_rng(self.seed)) @property def stateful(self) -> bool: return True
[docs] def process_hamiltonian(self, hamiltonian: SparsePauliOp) -> SparsePauliOp: r"""Apply the ``QDrift`` randomized channel to a Hamiltonian. Implements the ``QDrift`` protocol (Campbell 2019): for H = Σ c_i P_i, randomly sample L terms and rescale their coefficients so that E[H_sampled] = H. Rescaling rules (L = sampling_budget, λ = Σ\|c_i\|, N = #terms): - Weighted: term_i → (λ / (L · \|c_i\|)) · c_i · P_i - Uniform: term_i → (N / L) · c_i · P_i """ if ( self.keep_fraction is None and self.keep_top_n is None and self.sampling_budget is None ): return hamiltonian.simplify() if hamiltonian.size == 0: warn( "No terms to sample; returning the kept Hamiltonian.", UserWarning, ) return generate_empty_spo( _require_qiskit_num_qubits(hamiltonian.num_qubits) ) triggered_exact_trotterization = ( self.keep_fraction is not None or self.keep_top_n is not None ) # Cache keyed on ``id(spo)``; the cached tuple's first slot holds a # strong reference to the SPO so its id cannot be reused while cached. cache_key = id(hamiltonian) cached = self._cache.get(cache_key) if cached is not None: _, keep_spo, to_sample_spo = cached else: keep_spo = None if triggered_exact_trotterization: all_kept = ( self.keep_fraction is not None and self.keep_fraction == 1.0 ) or ( self.keep_top_n is not None and self.keep_top_n >= hamiltonian.size ) if all_kept: # Two warnings: the truncation no-op + "nothing left to sample". _warn_truncation_no_op( self.keep_fraction, self.keep_top_n, hamiltonian.size ) warn( "All terms were kept; there are no terms left to sample. " "Returning the full Hamiltonian.", UserWarning, ) self._cache[cache_key] = (hamiltonian, None, None) return hamiltonian keep_spo = ExactTrotterization( keep_fraction=self.keep_fraction, keep_top_n=self.keep_top_n )._truncate_spo(hamiltonian) # ``atol=0`` to keep small but genuine residual terms in the # sampling pool; default ``atol`` would silently drop them # when the input has wide coefficient magnitudes. to_sample_spo = (hamiltonian - keep_spo).simplify(atol=0) else: to_sample_spo = hamiltonian.simplify() self._cache[cache_key] = (hamiltonian, keep_spo, to_sample_spo) # All-kept branch (re-entered via cache). if to_sample_spo is None: return hamiltonian if self.sampling_budget is None: return hamiltonian.simplify() if keep_spo is None else keep_spo if to_sample_spo.size == 0: warn( "No terms to sample; returning the kept Hamiltonian.", UserWarning, ) if keep_spo is None: return generate_empty_spo( _require_qiskit_num_qubits(hamiltonian.num_qubits) ) return keep_spo if to_sample_spo.size == 1: sampled_spo = to_sample_spo else: absolute_coeffs = np.abs(to_sample_spo.coeffs) coeff_sum = absolute_coeffs.sum() if coeff_sum == 0: warn( "All term coefficients are zero; returning the kept Hamiltonian.", UserWarning, ) if keep_spo is None: return generate_empty_spo( _require_qiskit_num_qubits(hamiltonian.num_qubits) ) return keep_spo if self.sampling_strategy == "weighted": probs = absolute_coeffs / coeff_sum # Guard against ``probs.sum() == 1 + ε`` for very large term # counts; ``np.random.choice`` rejects probabilities that # don't sum to exactly 1. probs /= probs.sum() else: probs = None indices = self._rng.choice( to_sample_spo.size, size=self.sampling_budget, replace=True, p=probs, ) # Rescale so that E[H_sampled] = H. if self.sampling_strategy == "weighted": # p_i = |c_i|/λ → scale by λ / (L · |c_i|). scale = coeff_sum / (self.sampling_budget * absolute_coeffs[indices]) else: # p_i = 1/N → scale by N/L. scale = np.full( self.sampling_budget, to_sample_spo.size / self.sampling_budget, ) sampled_coeffs = scale * to_sample_spo.coeffs[indices] sampled_spo = SparsePauliOp(to_sample_spo.paulis[indices], sampled_coeffs) if keep_spo is not None: faithful_spo = sampled_spo + keep_spo else: faithful_spo = sampled_spo object.__setattr__(self, "last_sampled_spo", faithful_spo) if keep_spo is not None: return (sampled_spo + keep_spo).simplify() return sampled_spo.simplify()