Source code for divi.reporting._reporter

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

import atexit
import logging
from abc import ABC, abstractmethod
from queue import Queue

from rich.console import Console

import divi.reporting._qlogger as _qlogger
from divi.reporting import TerminalStatus, progress_disabled
from divi.reporting._qlogger import _ensure_unbuffered_stdout

logger = logging.getLogger(__name__)


[docs] class ProgressReporter(ABC): """An abstract base class for reporting progress of a quantum program."""
[docs] @abstractmethod def update(self, **kwargs) -> None: """Provides a progress update."""
[docs] @abstractmethod def info( self, message: str, *, final_status: TerminalStatus | None = None, **kwargs, ) -> None: """Provides a simple informational message. Args: message: The message to display. final_status: :class:`TerminalStatus` member to tag the row as final. Defaults to ``None`` (non-terminal). **kwargs: Additional keyword arguments for subclasses. """
[docs] class QueueProgressReporter(ProgressReporter): """Reports progress by putting structured dictionaries onto a Queue.""" def __init__(self, job_id: str, progress_queue: Queue): self._job_id = job_id self._queue = progress_queue
[docs] def update(self, **kwargs): payload = {"job_id": self._job_id, "progress": 1} if "loss" in kwargs and kwargs["loss"] is not None: payload["loss"] = kwargs["loss"] self._queue.put(payload)
[docs] def info( self, message: str, *, final_status: TerminalStatus | None = None, **kwargs, ): payload = {"job_id": self._job_id, "progress": 0, "message": message} if final_status is not None: payload["final_status"] = final_status if "poll_attempt" in kwargs: # For polling, remove the message key so the last message persists. del payload["message"] payload["poll_attempt"] = kwargs["poll_attempt"] payload["max_retries"] = kwargs["max_retries"] payload["service_job_id"] = kwargs["service_job_id"] payload["job_status"] = kwargs["job_status"] else: # For any other message, explicitly reset the polling attempt counter. payload["poll_attempt"] = 0 if "pipeline_stage" in kwargs: payload["pipeline_stage"] = kwargs["pipeline_stage"] self._queue.put(payload)
[docs] class LoggingProgressReporter(ProgressReporter): """Reports progress by logging messages to the console. Set the ``DIVI_DISABLE_PROGRESS`` environment variable to a truthy value (``1``, ``true``, ``yes``, or ``on``) to suppress the Rich spinner and fall back to plain log messages. """ _atexit_registered = False def __init__(self): _ensure_unbuffered_stdout() # Use the same console instance that RichHandler uses to avoid interference self._console = Console(file=None) # file=None uses stdout, same as RichHandler self._status = None # Track active status for overwriting messages self._current_msg = None # Track current main message self._pipeline_msg = None # Track current classical-pipeline stage message self._polling_msg = None # Track current polling message def _ensure_atexit_hook(self): if ( self._should_disable_progress() or LoggingProgressReporter._atexit_registered ): return atexit.register(self._close_status) LoggingProgressReporter._atexit_registered = True @staticmethod def _should_disable_progress() -> bool: if _qlogger._logging_disabled: return True return progress_disabled() def _close_status(self): """Close any active status.""" if self._status: self._status.__exit__(None, None, None) self._status = None self._current_msg = None self._pipeline_msg = None self._polling_msg = None def _build_status_msg(self) -> str: """Build combined status message from current message, pipeline stage, and polling info.""" parts = [] if self._current_msg: parts.append(self._current_msg) if self._pipeline_msg: parts.append(self._pipeline_msg) if self._polling_msg: parts.append(self._polling_msg) return " - ".join(parts) if parts else "" def _update_or_create_status(self): """Update existing status or create a new one with combined message.""" if self._should_disable_progress(): return status_msg = self._build_status_msg() if not status_msg: return self._ensure_atexit_hook() if self._status: self._status.update(status_msg) else: self._status = self._console.status(status_msg, spinner="aesthetic") self._status.__enter__()
[docs] def update(self, **kwargs): # Close any active status before logging self._close_status() loss = kwargs.get("loss") if loss is not None: logger.info( f"Finished Iteration #{kwargs['iteration']} (loss={float(loss):.6f})" ) return logger.info(f"Finished Iteration #{kwargs['iteration']}")
[docs] def info( self, message: str, *, final_status: TerminalStatus | None = None, overwrite: bool = False, **kwargs, ): # final_status is part of the ProgressReporter contract but does # not influence this reporter's display path; the Rich Status / # logger formatting key off the message text. del final_status if self._should_disable_progress(): logger.info(message) return # A special check for iteration updates to use Rich's status for overwriting if "poll_attempt" in kwargs: # Execution has begun — classical-pipeline stages are done, # clear that slot so only the polling context remains. self._pipeline_msg = None self._polling_msg = ( f"Job [cyan]{kwargs['service_job_id'].split('-')[0]}[/cyan] is " f"{kwargs['job_status']}. Polling attempt {kwargs['poll_attempt']} / " f"{kwargs['max_retries']}" ) self._update_or_create_status() return # Classical-pipeline progress — runs before execution. Concatenated # with the current iteration message so the user sees both the # surrounding activity and the specific stage being generated. if "pipeline_stage" in kwargs: stage = kwargs["pipeline_stage"] self._pipeline_msg = f"Pipeline: {stage}" if stage else None if self._build_status_msg(): self._update_or_create_status() return # Use Rich's status for iteration messages to enable overwriting if "iteration" in kwargs: self._current_msg = f"Iteration #{kwargs['iteration'] + 1}: {message}" self._update_or_create_status() return # Use Rich's status for messages that should overwrite if overwrite: # Set current message, keep polling state so it can be concatenated self._current_msg = message self._update_or_create_status() return # Close status for normal messages self._close_status() logger.info(message)