Source code for divi.reporting._reporter
# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0
import atexit
import logging
from abc import ABC, abstractmethod
from queue import Queue
from rich.console import Console
import divi.reporting._qlogger as _qlogger
from divi.reporting import TerminalStatus, progress_disabled
from divi.reporting._qlogger import _ensure_unbuffered_stdout
logger = logging.getLogger(__name__)
[docs]
class ProgressReporter(ABC):
"""An abstract base class for reporting progress of a quantum program."""
[docs]
@abstractmethod
def update(self, **kwargs) -> None:
"""Provides a progress update."""
[docs]
@abstractmethod
def info(
self,
message: str,
*,
final_status: TerminalStatus | None = None,
**kwargs,
) -> None:
"""Provides a simple informational message.
Args:
message: The message to display.
final_status: :class:`TerminalStatus` member to tag the row
as final. Defaults to ``None`` (non-terminal).
**kwargs: Additional keyword arguments for subclasses.
"""
[docs]
class QueueProgressReporter(ProgressReporter):
"""Reports progress by putting structured dictionaries onto a Queue."""
def __init__(self, job_id: str, progress_queue: Queue):
self._job_id = job_id
self._queue = progress_queue
[docs]
def update(self, **kwargs):
payload = {"job_id": self._job_id, "progress": 1}
if "loss" in kwargs and kwargs["loss"] is not None:
payload["loss"] = kwargs["loss"]
self._queue.put(payload)
[docs]
def info(
self,
message: str,
*,
final_status: TerminalStatus | None = None,
**kwargs,
):
payload = {"job_id": self._job_id, "progress": 0, "message": message}
if final_status is not None:
payload["final_status"] = final_status
if "poll_attempt" in kwargs:
# For polling, remove the message key so the last message persists.
del payload["message"]
payload["poll_attempt"] = kwargs["poll_attempt"]
payload["max_retries"] = kwargs["max_retries"]
payload["service_job_id"] = kwargs["service_job_id"]
payload["job_status"] = kwargs["job_status"]
else:
# For any other message, explicitly reset the polling attempt counter.
payload["poll_attempt"] = 0
if "pipeline_stage" in kwargs:
payload["pipeline_stage"] = kwargs["pipeline_stage"]
self._queue.put(payload)
[docs]
class LoggingProgressReporter(ProgressReporter):
"""Reports progress by logging messages to the console.
Set the ``DIVI_DISABLE_PROGRESS`` environment variable to a truthy value
(``1``, ``true``, ``yes``, or ``on``) to suppress the Rich spinner and
fall back to plain log messages.
"""
_atexit_registered = False
def __init__(self):
_ensure_unbuffered_stdout()
# Use the same console instance that RichHandler uses to avoid interference
self._console = Console(file=None) # file=None uses stdout, same as RichHandler
self._status = None # Track active status for overwriting messages
self._current_msg = None # Track current main message
self._pipeline_msg = None # Track current classical-pipeline stage message
self._polling_msg = None # Track current polling message
def _ensure_atexit_hook(self):
if (
self._should_disable_progress()
or LoggingProgressReporter._atexit_registered
):
return
atexit.register(self._close_status)
LoggingProgressReporter._atexit_registered = True
@staticmethod
def _should_disable_progress() -> bool:
if _qlogger._logging_disabled:
return True
return progress_disabled()
def _close_status(self):
"""Close any active status."""
if self._status:
self._status.__exit__(None, None, None)
self._status = None
self._current_msg = None
self._pipeline_msg = None
self._polling_msg = None
def _build_status_msg(self) -> str:
"""Build combined status message from current message, pipeline stage, and polling info."""
parts = []
if self._current_msg:
parts.append(self._current_msg)
if self._pipeline_msg:
parts.append(self._pipeline_msg)
if self._polling_msg:
parts.append(self._polling_msg)
return " - ".join(parts) if parts else ""
def _update_or_create_status(self):
"""Update existing status or create a new one with combined message."""
if self._should_disable_progress():
return
status_msg = self._build_status_msg()
if not status_msg:
return
self._ensure_atexit_hook()
if self._status:
self._status.update(status_msg)
else:
self._status = self._console.status(status_msg, spinner="aesthetic")
self._status.__enter__()
[docs]
def update(self, **kwargs):
# Close any active status before logging
self._close_status()
loss = kwargs.get("loss")
if loss is not None:
logger.info(
f"Finished Iteration #{kwargs['iteration']} (loss={float(loss):.6f})"
)
return
logger.info(f"Finished Iteration #{kwargs['iteration']}")
[docs]
def info(
self,
message: str,
*,
final_status: TerminalStatus | None = None,
overwrite: bool = False,
**kwargs,
):
# final_status is part of the ProgressReporter contract but does
# not influence this reporter's display path; the Rich Status /
# logger formatting key off the message text.
del final_status
if self._should_disable_progress():
logger.info(message)
return
# A special check for iteration updates to use Rich's status for overwriting
if "poll_attempt" in kwargs:
# Execution has begun — classical-pipeline stages are done,
# clear that slot so only the polling context remains.
self._pipeline_msg = None
self._polling_msg = (
f"Job [cyan]{kwargs['service_job_id'].split('-')[0]}[/cyan] is "
f"{kwargs['job_status']}. Polling attempt {kwargs['poll_attempt']} / "
f"{kwargs['max_retries']}"
)
self._update_or_create_status()
return
# Classical-pipeline progress — runs before execution. Concatenated
# with the current iteration message so the user sees both the
# surrounding activity and the specific stage being generated.
if "pipeline_stage" in kwargs:
stage = kwargs["pipeline_stage"]
self._pipeline_msg = f"Pipeline: {stage}" if stage else None
if self._build_status_msg():
self._update_or_create_status()
return
# Use Rich's status for iteration messages to enable overwriting
if "iteration" in kwargs:
self._current_msg = f"Iteration #{kwargs['iteration'] + 1}: {message}"
self._update_or_create_status()
return
# Use Rich's status for messages that should overwrite
if overwrite:
# Set current message, keep polling state so it can be concatenated
self._current_msg = message
self._update_or_create_status()
return
# Close status for normal messages
self._close_status()
logger.info(message)