Source code for divi.backends._qiskit_simulator

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

import bisect
import heapq
import logging
import os
import threading
from collections.abc import Mapping, Sequence
from functools import partial
from multiprocessing import Pool, current_process
from threading import Event
from typing import Any, Literal
from warnings import warn

from qiskit import QuantumCircuit, transpile
from qiskit.converters import circuit_to_dag
from qiskit.dagcircuit import DAGOpNode
from qiskit.providers import BackendV2
from qiskit.quantum_info import Pauli
from qiskit.transpiler.exceptions import TranspilerError
from qiskit_aer import AerSimulator
from qiskit_aer.library import SaveExpectationValue
from qiskit_aer.noise import NoiseModel

from divi.backends import CircuitRunner, ExecutionResult
from divi.backends._shot_allocation import (
    ShotRange,
    bucket_by_shots,
    from_wire,
    per_circuit,
    validate,
)
from divi.exceptions import ExecutionCancelledError

logger = logging.getLogger(__name__)

# Suppress stevedore extension loading errors (harmless Qiskit v2/provider issue)
_stevedore_logger = logging.getLogger("stevedore.extension")
_stevedore_logger.setLevel(logging.CRITICAL)

# Lazy-loaded fake backends dictionary
_FAKE_BACKENDS_CACHE: dict[int, list] | None = None


def _load_fake_backends() -> dict[int, list]:
    """Lazy load and return the FAKE_BACKENDS dictionary."""
    global _FAKE_BACKENDS_CACHE
    if _FAKE_BACKENDS_CACHE is None:
        # Import only when actually needed
        import qiskit_ibm_runtime.fake_provider as fk_prov

        _FAKE_BACKENDS_CACHE = {
            5: [
                fk_prov.FakeManilaV2,
                fk_prov.FakeBelemV2,
                fk_prov.FakeLimaV2,
                fk_prov.FakeQuitoV2,
            ],
            7: [
                fk_prov.FakeOslo,
                fk_prov.FakePerth,
                fk_prov.FakeLagosV2,
                fk_prov.FakeNairobiV2,
            ],
            15: [fk_prov.FakeMelbourneV2],
            16: [fk_prov.FakeGuadalupeV2],
            20: [
                fk_prov.FakeAlmadenV2,
                fk_prov.FakeJohannesburgV2,
                fk_prov.FakeSingaporeV2,
                fk_prov.FakeBoeblingenV2,
            ],
            27: [
                fk_prov.FakeGeneva,
                fk_prov.FakePeekskill,
                fk_prov.FakeAuckland,
                fk_prov.FakeCairoV2,
            ],
        }
    return _FAKE_BACKENDS_CACHE


def _find_best_fake_backend(circuit: QuantumCircuit) -> list[type] | None:
    """Find the best fake backend for a given circuit based on qubit count.

    Args:
        circuit: QuantumCircuit to find a backend for.

    Returns:
        List of fake backend classes that support the circuit's qubit count, or None.
    """
    fake_backends = _load_fake_backends()
    keys = sorted(fake_backends.keys())
    pos = bisect.bisect_left(keys, circuit.num_qubits)
    return fake_backends[keys[pos]] if pos < len(keys) else None


# Public API for backward compatibility with tests
def __getattr__(name: str):
    """Lazy load FAKE_BACKENDS when accessed."""
    if name == "FAKE_BACKENDS":
        return _load_fake_backends()
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


def _default_n_processes() -> int:
    """Get a reasonable default number of processes based on CPU count.

    Uses most available CPU cores (all minus 1, or 3/4 if many cores), with a
    minimum of 2 and maximum of 16. This provides good parallelism while leaving
    one core free for system processes.

    If running in a different thread or process (not the main thread/process),
    limits to 2 cores to avoid resource contention.

    Returns:
        int: Default number of processes to use.
    """
    # Check if we're running in a worker thread or subprocess
    is_main_thread = threading.current_thread() is threading.main_thread()
    is_main_process = current_process().name == "MainProcess"

    if not (is_main_thread and is_main_process):
        # Running in a different thread/process - limit to 2 cores
        return 2

    cpu_count = os.cpu_count() or 4
    if cpu_count <= 4:
        # For small systems, use all but 1 core
        return max(2, cpu_count - 1)
    elif cpu_count <= 16:
        # For medium systems, use all but 1 core
        return cpu_count - 1
    else:
        # For large systems, use 3/4 of cores, capped at 16
        return min(16, int(cpu_count * 0.75))


[docs] class QiskitSimulator(CircuitRunner): def __init__( self, n_processes: int | None = None, shots: int = 5000, simulation_seed: int | None = None, qiskit_backend: BackendV2 | Literal["auto"] | None = None, noise_model: NoiseModel | None = None, track_depth: bool = False, force_sampling: bool = False, _deterministic_execution: bool = False, ): """ A parallel wrapper around Qiskit's AerSimulator using Qiskit's built-in parallelism. Args: n_processes (int | None, optional): Number of parallel processes to use for transpilation and simulation. If None, defaults to half the available CPU cores (min 2, max 8). Controls both transpilation parallelism and execution parallelism. The execution parallelism mode (circuit or shot) is automatically selected based on workload characteristics. shots (int, optional): Number of shots to perform. Defaults to 5000. simulation_seed (int, optional): Seed for the random number generator to ensure reproducibility. Defaults to None. qiskit_backend (BackendV2 | Literal["auto"] | None, optional): A Qiskit backend to initiate the simulator from. If ``"auto"`` is passed, the best-fit most recent fake backend will be chosen for the given circuit. Defaults to None, resulting in noiseless simulation. noise_model (NoiseModel, optional): Qiskit noise model to use in simulation. Defaults to None. track_depth (bool, optional): If True, record circuit depth for each submitted batch. Access via :attr:`~divi.backends.CircuitRunner.depth_history` after execution. Defaults to False. force_sampling (bool, optional): If True, always use shot-based sampling even for expectation value measurements. Defaults to False. """ super().__init__(shots=shots, track_depth=track_depth) # Expval mode (save_expval) is incompatible with custom backends / # noise models — automatically fall back to shot-based sampling. if qiskit_backend is not None or noise_model is not None: force_sampling = True self._force_sampling = force_sampling if qiskit_backend and noise_model: warn( "Both `qiskit_backend` and `noise_model` have been provided." " `noise_model` will be ignored and the model from the backend will be used instead." ) if n_processes is None: n_processes = _default_n_processes() elif n_processes < 1: raise ValueError(f"n_processes must be >= 1, got {n_processes}") self._n_processes = n_processes self.simulation_seed = simulation_seed self.qiskit_backend = qiskit_backend self.noise_model = noise_model self._deterministic_execution = _deterministic_execution
[docs] def set_seed(self, seed: int): """ Set the random seed for circuit simulation. Args: seed (int): Seed value for the random number generator used in simulation. """ self.simulation_seed = seed
@property def n_processes(self) -> int: """ Get the current number of parallel processes. Returns: int: Number of parallel processes configured. """ return self._n_processes @n_processes.setter def n_processes(self, value: int): """ Set the number of parallel processes (>= 1). Controls: - Transpilation parallelism - OpenMP thread limit - Circuit/Shot parallelism (auto-selected based on workload) """ if value < 1: raise ValueError(f"n_processes must be >= 1, got {value}") self._n_processes = value @property def supports_expval(self) -> bool: """ Whether the backend supports expectation value measurements. """ return not self._force_sampling @property def is_async(self) -> bool: """ Whether the backend executes circuits asynchronously. """ return False def _resolve_backend( self, circuit: QuantumCircuit | None = None ) -> BackendV2 | None: """Resolve the backend from qiskit_backend setting.""" if self.qiskit_backend == "auto": if circuit is None: raise ValueError( "Circuit must be provided when qiskit_backend is 'auto'" ) backend_list = _find_best_fake_backend(circuit) if backend_list is None: raise ValueError( f"No fake backend available for circuit with {circuit.num_qubits} qubits. " "Please provide an explicit backend or use a smaller circuit." ) return backend_list[-1]() return self.qiskit_backend def _create_simulator(self, resolved_backend: BackendV2 | None) -> AerSimulator: """Create an AerSimulator instance from a resolved backend or noise model.""" return ( AerSimulator.from_backend(resolved_backend) if resolved_backend is not None else AerSimulator(noise_model=self.noise_model) ) def _execute_circuits_deterministically( self, circuit_labels: list[str], transpiled_circuits: list[QuantumCircuit], resolved_backend: BackendV2 | None, per_circuit_shots: list[int] | None = None, ) -> list[dict[str, Any]]: """ Execute circuits individually for debugging purposes. This method ensures deterministic results by running each circuit with its own simulator instance and the same seed. Used internally for debugging non-deterministic behavior in batch execution. Args: circuit_labels: List of circuit labels transpiled_circuits: List of transpiled QuantumCircuit objects resolved_backend: Resolved backend for simulator creation per_circuit_shots: Optional per-circuit shot counts (e.g. from ``shot_groups``). When ``None``, every circuit uses ``self.shots``. Returns: List of result dictionaries """ results = [] for i, (label, transpiled_circuit) in enumerate( zip(circuit_labels, transpiled_circuits) ): # Create a new simulator instance for each circuit with the same seed circuit_simulator = self._create_simulator(resolved_backend) if self.simulation_seed is not None: circuit_simulator.set_options(seed_simulator=self.simulation_seed + i) # Run the single circuit shots = ( per_circuit_shots[i] if per_circuit_shots is not None else self.shots ) job = circuit_simulator.run(transpiled_circuit, shots=shots) circuit_result = job.result() counts = circuit_result.get_counts(0) results.append({"label": label, "results": dict(counts)}) return results def _configure_simulator_parallelism( self, aer_simulator: AerSimulator, num_circuits: int ): """Configure AerSimulator parallelism options based on workload.""" if self.simulation_seed is not None: aer_simulator.set_options(seed_simulator=self.simulation_seed) # Default to utilizing all allocated processes for threads options = {"max_parallel_threads": self.n_processes} if num_circuits > 1: # Batch mode: parallelize experiments options.update( { "max_parallel_experiments": min(num_circuits, self.n_processes), "max_parallel_shots": 1, } ) elif self.shots >= self.n_processes: # Single circuit, high shots: parallelize shots options.update( { "max_parallel_experiments": 1, "max_parallel_shots": self.n_processes, } ) else: # Single circuit, low shots: default behavior (usually serial shots) options.update( { "max_parallel_experiments": 1, "max_parallel_shots": 1, } ) aer_simulator.set_options(**options) @staticmethod def _get_ham_ops_for_circuit( circuit_index: int, ham_ops: str, circuit_ham_map: list[list[int]] | None, ) -> list[str]: """Resolve which Pauli operators apply to a given circuit. Args: circuit_index: Index of the circuit in the batch. ham_ops: Semicolon-separated Pauli string, optionally with ``|``-delimited groups when ``circuit_ham_map`` is provided. circuit_ham_map: Each entry is ``[start, end)`` mapping a ``|``-group to a contiguous slice of circuits. Returns: List of individual Pauli operator strings for this circuit. """ if circuit_ham_map is None: return ham_ops.replace("|", ";").split(";") groups = ham_ops.split("|") for group_index, (start, end) in enumerate(circuit_ham_map): if start <= circuit_index < end: return groups[group_index].split(";") return ham_ops.replace("|", ";").split(";") @staticmethod def _prepare_expval_circuit( circuit: QuantumCircuit, pauli_ops: list[str] ) -> QuantumCircuit: """Strip measurements and append ``save_expectation_value`` for each Pauli operator. Args: circuit: Qiskit circuit (may contain final measurements). pauli_ops: List of Pauli strings in divi convention (big-endian, q0 leftmost). Returns: New circuit with measurements removed and expectation-value save instructions. """ qc = circuit.copy() qc.remove_final_measurements(inplace=True) for pauli_str in pauli_ops: # Reverse: divi big-endian (q0 leftmost) → Qiskit little-endian (q0 rightmost) qc.append( SaveExpectationValue(Pauli(pauli_str[::-1]), label=pauli_str), qargs=range(qc.num_qubits), ) return qc def _execute_expval( self, circuit_labels: list[str], qiskit_circuits: list[QuantumCircuit], ham_ops: str, circuit_ham_map: list[list[int]] | None, ) -> list[dict]: """Execute circuits in expectation-value mode. Uses Qiskit Aer's ``save_expectation_value`` to compute exact expectation values at the statevector level. Returns: List of ``{"label": str, "results": {pauli: float}}`` dicts. """ prepared = [] per_circuit_ops: list[list[str]] = [] for i, qc in enumerate(qiskit_circuits): ops = self._get_ham_ops_for_circuit(i, ham_ops, circuit_ham_map) per_circuit_ops.append(ops) prepared.append(self._prepare_expval_circuit(qc, ops)) # Resolve backend + create simulator (same as sampling path) if self.qiskit_backend == "auto": max_qubits_circ = max(prepared, key=lambda x: x.num_qubits) resolved_backend = self._resolve_backend(max_qubits_circ) else: resolved_backend = self._resolve_backend() aer_simulator = self._create_simulator(resolved_backend) self._configure_simulator_parallelism(aer_simulator, len(prepared)) transpiled = transpile(prepared, aer_simulator, num_processes=self.n_processes) job = aer_simulator.run(transpiled) batch_result = job.result() results = [] for i, label in enumerate(circuit_labels): expvals = {op: float(batch_result.data(i)[op]) for op in per_circuit_ops[i]} results.append({"label": label, "results": expvals}) return results
[docs] def submit_circuits( self, circuits: Mapping[str, str], *, ham_ops: str | None = None, circuit_ham_map: list[list[int]] | None = None, shot_groups: list[list[int]] | None = None, cancellation_event: Event | None = None, **kwargs, ) -> ExecutionResult: """Submit multiple circuits for parallel simulation using Qiskit's built-in parallelism. Args: circuits: Dictionary mapping circuit labels to OpenQASM string representations. ham_ops: Semicolon-separated Pauli string for expectation value estimation, e.g. ``"ZI;IZ;XX"``. Multiple groups can be pipe-delimited when ``circuit_ham_map`` is provided. If None, runs in sampling mode. circuit_ham_map: Each entry is ``[start, end)`` mapping a ``|``-group in ``ham_ops`` to a contiguous slice of circuits. shot_groups: Per-circuit shot allocation as ``[start, end, shots]`` triples covering the iteration order of ``circuits``. When provided, overrides ``self.shots`` for each contiguous range and triggers one ``aer_simulator.run`` call per range. Sampling-mode only — ignored when ``ham_ops`` is provided. cancellation_event: When set before this call, aborts dispatch. Aer's ``.run().result()`` cannot be interrupted mid-batch. **kwargs: Additional parameters (unused, accepted for interface compatibility). Returns: ExecutionResult containing either counts (sampling) or expectation values. """ if cancellation_event is not None and cancellation_event.is_set(): raise ExecutionCancelledError("Qiskit batch cancelled before dispatch") logger.debug( f"Simulating {len(circuits)} circuits with {self.n_processes} processes" ) if ham_ops is not None and shot_groups is not None: raise ValueError( "shot_groups is incompatible with ham_ops: expectation-value " "mode is analytical and ignores shot counts. Pass exactly one." ) # 1. Parse Circuits circuit_labels = list(circuits.keys()) qiskit_circuits = [ QuantumCircuit.from_qasm_str(qasm) for qasm in circuits.values() ] if self.track_depth: self._depth_history.append([qc.depth() for qc in qiskit_circuits]) # Expectation value mode if ham_ops is not None: results = self._execute_expval( circuit_labels, qiskit_circuits, ham_ops, circuit_ham_map ) return ExecutionResult(results=results) # 2. Resolve Backend if self.qiskit_backend == "auto": max_qubits_circ = max(qiskit_circuits, key=lambda x: x.num_qubits) resolved_backend = self._resolve_backend(max_qubits_circ) else: resolved_backend = self._resolve_backend() # 3. Configure Simulator aer_simulator = self._create_simulator(resolved_backend) self._configure_simulator_parallelism(aer_simulator, len(qiskit_circuits)) # 4. Transpile transpiled_circuits = transpile( qiskit_circuits, aer_simulator, num_processes=self.n_processes ) # 5. Execute shot_ranges: list[ShotRange] | None = None if shot_groups is not None: shot_ranges = from_wire(shot_groups) validate(shot_ranges, len(transpiled_circuits)) if self._deterministic_execution: per_circuit_shots = ( per_circuit(shot_ranges, len(transpiled_circuits)) if shot_ranges is not None else None ) results = self._execute_circuits_deterministically( circuit_labels, transpiled_circuits, resolved_backend, per_circuit_shots=per_circuit_shots, ) return ExecutionResult(results=results) if shot_ranges is not None: results = self._execute_with_shot_groups( circuit_labels, transpiled_circuits, aer_simulator, shot_ranges ) return ExecutionResult(results=results) job = aer_simulator.run(transpiled_circuits, shots=self.shots) batch_result = job.result() # Check for non-determinism warnings metadata = batch_result.metadata if ( parallel_experiments := metadata.get("parallel_experiments", 1) ) > 1 and self.simulation_seed is not None: omp_nested = metadata.get("omp_nested", False) logger.warning( f"Parallel execution detected (parallel_experiments={parallel_experiments}, " f"omp_nested={omp_nested}). Results may not be deterministic across different " "grouping strategies. Consider enabling deterministic mode for " "deterministic results." ) # 6. Format Results results = [ {"label": label, "results": dict(batch_result.get_counts(i))} for i, label in enumerate(circuit_labels) ] return ExecutionResult(results=results)
def _execute_with_shot_groups( self, circuit_labels: list[str], transpiled_circuits: list, aer_simulator: AerSimulator, shot_ranges: list[ShotRange], ) -> list[dict[str, Any]]: """Execute one ``aer_simulator.run`` per distinct shot count. Aer's ``run(shots=...)`` applies a single shot count to all circuits in the call, so distinct shot levels require distinct calls. Ranges that share the same shot count — even if non-contiguous — are batched into one ``run`` to preserve Aer's internal parallelism. Results are re-ordered back to the original circuit positions. """ n_total = len(transpiled_circuits) results: list[dict[str, Any] | None] = [None] * n_total for shots, indices in bucket_by_shots(shot_ranges).items(): sub_circuits = [transpiled_circuits[i] for i in indices] job = aer_simulator.run(sub_circuits, shots=shots) batch_result = job.result() for offset, idx in enumerate(indices): results[idx] = { "label": circuit_labels[idx], "results": dict(batch_result.get_counts(offset)), } return results # type: ignore[return-value]
[docs] @staticmethod def estimate_run_time_single_circuit( circuit: str, qiskit_backend: BackendV2 | Literal["auto"], **transpilation_kwargs, ) -> float: """ Estimate the execution time of a quantum circuit on a given backend, accounting for parallel gate execution. Parameters: circuit: The quantum circuit to estimate execution time for as a QASM string. qiskit_backend: A Qiskit backend to use for gate time estimation. Returns: float: Estimated execution time in seconds. """ qiskit_circuit = QuantumCircuit.from_qasm_str(circuit) if qiskit_backend == "auto": if not (backend_list := _find_best_fake_backend(qiskit_circuit)): raise ValueError( f"No fake backend available for circuit with {qiskit_circuit.num_qubits} qubits. " "Please provide an explicit backend or use a smaller circuit." ) resolved_backend = backend_list[-1]() else: resolved_backend = qiskit_backend transpiled_circuit = transpile( qiskit_circuit, resolved_backend, **transpilation_kwargs ) total_run_time_s = 0.0 target = resolved_backend.target if target is None: raise RuntimeError( f"Backend {resolved_backend!r} has no transpiler target; " "cannot estimate run time." ) durations = target.durations() for node in circuit_to_dag(transpiled_circuit).longest_path(): if not isinstance(node, DAGOpNode) or not node.num_qubits: continue try: idx = tuple(q._index for q in node.qargs) duration = durations.get(node.name, idx, unit="s") total_run_time_s += duration except TranspilerError: if node.name != "barrier": warn(f"Instruction duration not found: {node.name}") return total_run_time_s
[docs] @staticmethod def estimate_run_time_batch( circuits: Sequence[str] | None = None, precomputed_durations: Sequence[float] | None = None, n_qpus: int = 5, **transpilation_kwargs, ) -> float: """ Estimate the execution time of a quantum circuit on a given backend, accounting for parallel gate execution. Parameters: circuits (list[str]): The quantum circuits to estimate execution time for, as QASM strings. precomputed_durations (list[float]): A list of precomputed durations to use. n_qpus (int): Number of QPU nodes in the pre-supposed cluster we are estimating runtime against. Returns: float: Estimated execution time in seconds. """ # Compute the run time estimates for each given circuit, in descending order if precomputed_durations is not None: estimated_run_times_sorted = sorted(precomputed_durations, reverse=True) elif circuits is not None: # Pin the worker count to ``_default_n_processes()`` so this # static helper inherits the same fork/thread-aware sizing the # instance uses, instead of defaulting to ``os.cpu_count()`` # workers regardless of context. with Pool(processes=_default_n_processes()) as p: estimated_run_times = p.map( partial( QiskitSimulator.estimate_run_time_single_circuit, qiskit_backend="auto", **transpilation_kwargs, ), circuits, ) estimated_run_times_sorted = sorted(estimated_run_times, reverse=True) else: raise ValueError( "estimate_run_time_batch requires either ``circuits`` or " "``precomputed_durations`` to be provided." ) # Optimization for trivial case if n_qpus >= len(estimated_run_times_sorted): return estimated_run_times_sorted[0] if estimated_run_times_sorted else 0.0 # LPT (Longest Processing Time) scheduling using a min-heap of processor finish times processor_finish_times = [0.0] * n_qpus for run_time in estimated_run_times_sorted: heapq.heappush( processor_finish_times, heapq.heappop(processor_finish_times) + run_time ) return max(processor_finish_times)