# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0
import base64
import json
import pickle
import time
import warnings
from abc import ABC, abstractmethod
from collections.abc import Callable
from enum import Enum
from pathlib import Path
from typing import Any, cast
import cma
import dill
import numpy as np
import numpy.typing as npt
from pydantic import BaseModel
from pymoo.algorithms.soo.nonconvex.de import DE # type: ignore
from pymoo.core.evaluator import Evaluator
from pymoo.core.individual import Individual
from pymoo.core.population import Population
from pymoo.core.problem import Problem
from pymoo.core.termination import NoTermination
from pymoo.problems.static import StaticProblem
from scipy.optimize import OptimizeResult, minimize
from divi.qprog.checkpointing import (
OPTIMIZER_STATE_FILE,
_atomic_write,
_load_and_validate_pydantic_model,
)
__all__ = [
"GridSearchOptimizer",
"MonteCarloOptimizer",
"MonteCarloState",
"Optimizer",
"PymooMethod",
"PymooOptimizer",
"PymooState",
"ScipyMethod",
"ScipyOptimizer",
"copy_optimizer",
]
[docs]
class MonteCarloState(BaseModel):
"""Pydantic model for Monte Carlo optimizer state."""
population_size: int
n_best_sets: int
keep_best_params: bool
curr_iteration: int
# Store arrays as lists for JSON serialization
# Population arrays are always 2D: (population_size, n_params)
population: list[list[float]]
evaluated_population: list[list[float]]
losses: list[float]
# RNG state is a dict/tuple complex structure, simplified storage as dict or bytes
# Stored as base64 encoded string for JSON compatibility
rng_state_b64: str
[docs]
class PymooState(BaseModel):
"""Pydantic model for Pymoo optimizer state."""
method_value: str
population_size: int
algorithm_kwargs: dict[str, Any]
# We store the pickled algorithm object as base64 encoded string
algorithm_obj_b64: str
[docs]
class Optimizer(ABC):
"""
Abstract base class for all optimizers.
.. warning::
**Thread Safety**: Optimizer instances are **not thread-safe**. They maintain
internal state (e.g., current population, iteration count, RNG state) that changes
during optimization.
Do **not** share a single `Optimizer` instance across multiple `QuantumProgram`
instances or threads running in parallel. Doing so will lead to race conditions,
corrupted state, and potential crashes.
If you need to use the same optimizer configuration for multiple programs,
create a separate instance for each program. You can use the helper function
:func:`copy_optimizer` to create a fresh copy with the same configuration.
"""
@property
@abstractmethod
def n_param_sets(self) -> int:
"""
Returns the number of parameter sets the optimizer can handle per optimization run.
Returns:
int: Number of parameter sets.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
@abstractmethod
def optimize(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
initial_params: npt.NDArray[np.float64] | None = None,
callback_fn: Callable[[OptimizeResult], Any] | None = None,
**kwargs,
) -> OptimizeResult:
"""Optimize the given cost function starting from initial parameters.
Parameters:
cost_fn: The cost function to minimize.
initial_params: Initial parameters for the optimization.
callback_fn: Function called after each iteration with an OptimizeResult object.
**kwargs: Additional keyword arguments for the optimizer:
- max_iterations (int, optional): Total desired number of iterations.
When resuming from a checkpoint, this represents the total iterations
desired across all runs. The optimizer will automatically calculate
and run only the remaining iterations needed.
Defaults vary by optimizer (e.g., 5 for population-based optimizers,
None for some scipy methods).
- rng (np.random.Generator, optional): Random number generator for
stochastic optimizers (PymooOptimizer, MonteCarloOptimizer).
Defaults to a new generator if not provided.
- jac (Callable, optional): Gradient/Jacobian function for
gradient-based optimizers (only used by ScipyOptimizer with
L_BFGS_B method). Defaults to None.
Returns:
Optimized parameters.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
@abstractmethod
def get_config(self) -> dict[str, Any]:
"""Get optimizer configuration for checkpoint reconstruction.
Returns:
dict[str, Any]: Dictionary containing optimizer type and configuration parameters.
Raises:
NotImplementedError: If the optimizer does not support checkpointing.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
@abstractmethod
def save_state(self, checkpoint_dir: Path | str) -> None:
"""Save the optimizer's internal state to a checkpoint directory.
Args:
checkpoint_dir: Directory path where the optimizer state will be saved.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
@classmethod
@abstractmethod
def load_state(cls, checkpoint_dir: Path | str) -> "Optimizer":
"""Load the optimizer's internal state from a checkpoint directory.
Creates a new optimizer instance with the state restored from the checkpoint.
Args:
checkpoint_dir: Directory path where the optimizer state is saved.
Returns:
Optimizer: A new optimizer instance with restored state.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
@abstractmethod
def reset(self) -> None:
"""Reset the optimizer's internal state to allow fresh optimization runs.
Clears any state accumulated during previous optimization runs, allowing
the optimizer to be reused for new optimization problems without creating
a new instance.
"""
raise NotImplementedError("This method should be implemented by subclasses.")
[docs]
class PymooMethod(Enum):
"""Supported optimization methods from the pymoo library."""
CMAES = "CMAES"
DE = "DE"
[docs]
class PymooOptimizer(Optimizer):
"""
Optimizer wrapper for pymoo optimization algorithms and CMA-ES.
Supports population-based optimization methods from the pymoo library (DE)
and the cma library (CMAES).
"""
def __init__(self, method: PymooMethod, population_size: int = 50, **kwargs):
"""
Initialize a pymoo-based optimizer.
Args:
method (PymooMethod): The optimization algorithm to use (CMAES or DE).
population_size (int, optional): Size of the population for the algorithm.
Defaults to 50.
**kwargs: Additional algorithm-specific parameters passed to pymoo/cma.
"""
super().__init__()
self.method = method
self.population_size = population_size
self.algorithm_kwargs = kwargs
# Optimization state (updated during optimize(), used for checkpointing)
self._curr_algorithm_obj: Any | None = None
@property
def n_param_sets(self) -> int:
"""
Get the number of parameter sets (population size) used by this optimizer.
Returns:
int: Population size for the optimization algorithm.
"""
# Determine population size from stored parameters
if self.method.value == "DE":
return self.population_size
elif self.method.value == "CMAES":
# CMAES uses 'popsize' in options dict
return self.algorithm_kwargs.get("popsize", self.population_size)
return self.population_size
[docs]
def get_config(self) -> dict[str, Any]:
"""Get optimizer configuration for checkpoint reconstruction.
Returns:
dict[str, Any]: Dictionary containing optimizer type and configuration parameters.
"""
return {
"type": "PymooOptimizer",
"method": self.method.value,
"population_size": self.population_size,
**self.algorithm_kwargs,
}
def _initialize_cmaes(
self,
initial_params: npt.NDArray[np.float64],
rng: np.random.Generator,
) -> Any:
"""Initialize CMA-ES strategy."""
# Initialize CMA-ES using cma library
# cma expects a single initial solution (mean) and initial sigma
x0 = initial_params[0] # Use first parameter set as mean
# Handle sigma/sigma0
sigma0 = self.algorithm_kwargs.get(
"sigma0", self.algorithm_kwargs.get("sigma", 0.1)
)
# Filter kwargs for CMAEvolutionStrategy
cma_kwargs = {
k: v
for k, v in self.algorithm_kwargs.items()
if k not in ["sigma0", "sigma", "popsize"]
}
cma_kwargs["popsize"] = self.population_size
cma_kwargs["seed"] = rng.integers(0, 2**32)
cma_kwargs.setdefault("verbose", -9)
es = cma.CMAEvolutionStrategy(x0, sigma0, cma_kwargs)
return es
def _initialize_pymoo(
self,
initial_params: npt.NDArray[np.float64],
rng: np.random.Generator,
) -> Any:
"""Initialize Pymoo strategy (DE)."""
# Initialize DE using pymoo
optimizer_obj = globals()[self.method.value](
pop_size=self.population_size,
parallelize=False,
**self.algorithm_kwargs,
)
# numpy's stub types seed_seq as ISeedSequence which lacks `spawn`;
# at runtime it's always the concrete SeedSequence.
seed_seq = cast(np.random.SeedSequence, rng.bit_generator.seed_seq)
seed = seed_seq.spawn(1)[0].generate_state(1)[0]
n_var = initial_params.shape[-1]
xl = np.zeros(n_var)
xu = np.ones(n_var) * 2 * np.pi
problem = Problem(n_var=n_var, n_obj=1, xl=xl, xu=xu)
optimizer_obj.setup(
problem,
termination=NoTermination(),
seed=int(seed),
verbose=False,
)
optimizer_obj.start_time = time.time()
init_pop = Population.create(
*[Individual(X=initial_params[i]) for i in range(self.n_param_sets)]
)
optimizer_obj.pop = init_pop
return optimizer_obj
def _initialize_optimizer(
self,
initial_params: npt.NDArray[np.float64],
rng: np.random.Generator,
) -> Any:
"""Initialize a fresh optimizer instance.
Args:
initial_params: Initial parameter values.
rng: Random number generator.
Returns:
Optimizer object (cma.CMAEvolutionStrategy or pymoo.DE).
"""
if self.method == PymooMethod.CMAES:
return self._initialize_cmaes(initial_params, rng)
else:
return self._initialize_pymoo(initial_params, rng)
def _optimize_cmaes(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
iterations_to_run: int,
callback_fn: Callable | None,
) -> OptimizeResult:
"""Run CMA-ES optimization loop."""
if self._curr_algorithm_obj is None:
raise RuntimeError(
"_curr_algorithm_obj is not initialized; call optimize() first "
"so _initialize_optimizer runs."
)
es = self._curr_algorithm_obj
for _ in range(iterations_to_run):
# Ask
X = es.ask()
evaluated_X = np.array(X)
# Evaluate
curr_losses = cost_fn(evaluated_X)
# Tell
es.tell(X, curr_losses)
if callback_fn:
callback_fn(OptimizeResult(x=evaluated_X, fun=curr_losses))
# Return result
return OptimizeResult(
x=es.result.xbest,
fun=es.result.fbest,
nit=es.countiter,
)
def _optimize_pymoo(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
iterations_to_run: int,
callback_fn: Callable | None,
) -> OptimizeResult:
"""Run Pymoo (DE) optimization loop."""
if self._curr_algorithm_obj is None:
raise RuntimeError(
"_curr_algorithm_obj is not initialized; call optimize() first "
"so _initialize_optimizer runs."
)
algo = self._curr_algorithm_obj
problem = algo.problem
for _ in range(iterations_to_run):
pop = algo.pop
evaluated_X = pop.get("X")
curr_losses = cost_fn(evaluated_X)
Evaluator().eval(StaticProblem(problem, F=curr_losses), pop)
algo.tell(infills=pop)
# Ask for next population to evaluate
algo.pop = algo.ask()
if callback_fn:
callback_fn(OptimizeResult(x=evaluated_X, fun=curr_losses))
result = algo.result()
# nit should represent total iterations completed (n_gen is 1-indexed)
return OptimizeResult(
x=result.X,
fun=result.F,
nit=algo.n_gen - 1,
)
[docs]
def optimize(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
initial_params: npt.NDArray[np.float64] | None = None,
callback_fn: Callable | None = None,
**kwargs,
):
"""
Run the optimization algorithm.
Args:
cost_fn (Callable): Function to minimize. Should accept a 2D array of
parameter sets and return an array of cost values.
initial_params (npt.NDArray[np.float64], optional): Initial parameter values as a 2D array
of shape (n_param_sets, n_params). Should be None when resuming from a checkpoint.
callback_fn (Callable, optional): Function called after each iteration
with an OptimizeResult object. Defaults to None.
**kwargs: Additional keyword arguments:
- max_iterations (int): Total desired number of iterations.
When resuming from a checkpoint, this represents the total iterations
desired across all runs. The optimizer will automatically calculate
and run only the remaining iterations needed. Defaults to 5.
- rng (np.random.Generator): Random number generator.
Returns:
OptimizeResult: Optimization result with final parameters and cost value.
"""
max_iterations = kwargs.pop("max_iterations", 5)
# Resume from checkpoint or initialize fresh
if self._curr_algorithm_obj is not None:
if self.method == PymooMethod.CMAES:
es = self._curr_algorithm_obj
# cma uses counteigen as generation counter roughly
# strictly speaking es.countiter is the iteration counter
iterations_completed = es.countiter
else:
# Pymoo DE
# n_gen is 1-indexed (includes initialization), so actual iterations = n_gen - 1
iterations_completed = self._curr_algorithm_obj.n_gen - 1
iterations_remaining = max_iterations - iterations_completed
iterations_to_run = max(0, iterations_remaining)
else:
if initial_params is None:
raise ValueError(
"initial_params is required for a fresh PymooOptimizer run."
)
rng = kwargs.pop("rng", np.random.default_rng())
self._curr_algorithm_obj = self._initialize_optimizer(initial_params, rng)
iterations_to_run = max_iterations
if self.method == PymooMethod.CMAES:
return self._optimize_cmaes(cost_fn, iterations_to_run, callback_fn)
else:
return self._optimize_pymoo(cost_fn, iterations_to_run, callback_fn)
[docs]
def save_state(self, checkpoint_dir: Path | str) -> None:
"""Save the optimizer's internal state to a checkpoint directory.
Args:
checkpoint_dir (Path | str): Directory path where the optimizer state will be saved.
Raises:
RuntimeError: If optimization has not been run (no state to save).
"""
if self._curr_algorithm_obj is None:
raise RuntimeError(
"Cannot save checkpoint: optimization has not been run. "
"At least one iteration must complete before saving optimizer state."
)
checkpoint_path = Path(checkpoint_dir)
checkpoint_path.mkdir(parents=True, exist_ok=True)
state_file = checkpoint_path / OPTIMIZER_STATE_FILE
# Serialize algorithm object using dill, then base64 encode
# For CMAES (cma lib), algorithm object is picklable.
# For DE (pymoo), algorithm object is picklable and includes pop and problem.
algorithm_obj_bytes = dill.dumps(self._curr_algorithm_obj)
algorithm_obj_b64 = base64.b64encode(algorithm_obj_bytes).decode("ascii")
state = PymooState(
method_value=self.method.value,
population_size=self.population_size,
algorithm_kwargs=self.algorithm_kwargs,
algorithm_obj_b64=algorithm_obj_b64,
)
_atomic_write(state_file, state.model_dump_json(indent=2))
[docs]
@classmethod
def load_state(cls, checkpoint_dir: Path | str) -> "PymooOptimizer":
"""Load the optimizer's internal state from a checkpoint directory.
Creates a new PymooOptimizer instance with the state restored from the checkpoint.
Args:
checkpoint_dir (Path | str): Directory path where the optimizer state is saved.
Returns:
PymooOptimizer: A new optimizer instance with restored state.
Raises:
FileNotFoundError: If the checkpoint file does not exist.
"""
checkpoint_path = Path(checkpoint_dir)
state_file = checkpoint_path / OPTIMIZER_STATE_FILE
state = _load_and_validate_pydantic_model(
state_file,
PymooState,
required_fields=["method_value", "algorithm_obj_b64"],
error_context="Pymoo optimizer",
)
# Create new instance with saved configuration
optimizer = cls(
method=PymooMethod(state.method_value),
population_size=state.population_size,
**state.algorithm_kwargs,
)
# Restore algorithm object from base64 string
# For DE, this includes the population and problem
optimizer._curr_algorithm_obj = dill.loads(
base64.b64decode(state.algorithm_obj_b64)
)
return optimizer
[docs]
def reset(self) -> None:
"""Reset the optimizer's internal state.
Clears the current algorithm object, allowing the optimizer
to be reused for fresh optimization runs.
"""
self._curr_algorithm_obj = None
[docs]
class ScipyMethod(Enum):
"""Supported optimization methods from scipy.optimize."""
NELDER_MEAD = "Nelder-Mead"
COBYLA = "COBYLA"
L_BFGS_B = "L-BFGS-B"
[docs]
class ScipyOptimizer(Optimizer):
"""
Optimizer wrapper for scipy.optimize methods.
Supports gradient-free and gradient-based optimization algorithms from scipy,
including Nelder-Mead simplex, COBYLA, and L-BFGS-B.
"""
def __init__(self, method: ScipyMethod):
"""
Initialize a scipy-based optimizer.
Args:
method (ScipyMethod): The optimization algorithm to use.
"""
super().__init__()
self.method = method
@property
def n_param_sets(self) -> int:
"""
Get the number of parameter sets used by this optimizer.
Returns:
int: Always returns 1, as scipy optimizers use single-point optimization.
"""
return 1
# pyrefly: ignore[bad-override]
[docs]
def optimize(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float],
initial_params: npt.NDArray[np.float64] | None = None,
callback_fn: Callable[[OptimizeResult], Any] | None = None,
**kwargs,
) -> OptimizeResult:
"""
Run the scipy optimization algorithm.
Args:
cost_fn (Callable): Function to minimize. Should accept a 1D array of
parameters and return a scalar cost value.
initial_params (npt.NDArray[np.float64]): Initial parameter values as a 1D or 2D array.
If 2D with shape (1, n_params), it will be squeezed to 1D.
callback_fn (Callable, optional): Function called after each iteration
with an `OptimizeResult` object. Defaults to None.
**kwargs: Additional keyword arguments:
- max_iterations (int, optional): Total desired number of iterations.
Defaults to None (no limit for some methods).
- jac (Callable): Gradient function (only used for L-BFGS-B).
Returns:
OptimizeResult: Optimization result with final parameters and cost value.
"""
max_iterations = kwargs.pop("max_iterations", None)
# If a callback is provided, we wrap the cost function and callback
# to ensure the data passed to the callback has a consistent shape.
if callback_fn:
def callback_wrapper(intermediate_result: OptimizeResult):
# Create a dictionary from the intermediate result to preserve all of its keys.
result_dict = dict(intermediate_result)
# Overwrite 'x' and 'fun' to ensure they have consistent dimensions.
result_dict["x"] = np.atleast_2d(intermediate_result.x)
result_dict["fun"] = np.atleast_1d(intermediate_result.fun)
# Create a new OptimizeResult and pass it to the user's callback.
return callback_fn(OptimizeResult(**result_dict))
else:
callback_wrapper = None
if max_iterations is None or self.method == ScipyMethod.COBYLA:
# COBYLA perceive maxiter as maxfev so we need
# to use the callback fn for counting instead.
maxiter = None
else:
# Need to add one more iteration for Nelder-Mead's simplex initialization step
maxiter = (
max_iterations + 1
if self.method == ScipyMethod.NELDER_MEAD
else max_iterations
)
if initial_params is None:
raise ValueError("ScipyOptimizer requires initial_params.")
return minimize(
cost_fn,
initial_params.squeeze(),
method=self.method.value,
jac=(
kwargs.pop("jac", None) if self.method == ScipyMethod.L_BFGS_B else None
),
callback=callback_wrapper,
options={"maxiter": maxiter},
)
[docs]
def save_state(self, checkpoint_dir: Path | str) -> None:
"""Save the optimizer's internal state to a checkpoint directory.
Scipy optimizers do not support saving state mid-minimization as scipy.optimize
does not provide access to the internal optimizer state.
Args:
checkpoint_dir: Directory path where the optimizer state would be saved.
Raises:
NotImplementedError: Always raised, as scipy optimizers cannot save state.
"""
raise NotImplementedError(
"ScipyOptimizer does not support state saving. Scipy's optimization methods "
"do not provide access to internal optimizer state during minimization. "
"Please use MonteCarloOptimizer or PymooOptimizer for checkpointing support."
)
[docs]
@classmethod
def load_state(cls, checkpoint_dir: Path | str) -> "ScipyOptimizer":
"""Load the optimizer's internal state from a checkpoint directory.
Scipy optimizers do not support loading state as they cannot save state.
Args:
checkpoint_dir: Directory path where the optimizer state would be loaded from.
Raises:
NotImplementedError: Always raised, as scipy optimizers cannot load state.
"""
raise NotImplementedError(
"ScipyOptimizer does not support state loading. Scipy's optimization methods "
"do not provide access to internal optimizer state during minimization. "
"Please use MonteCarloOptimizer or PymooOptimizer for checkpointing support."
)
[docs]
def reset(self) -> None:
"""Reset the optimizer's internal state.
ScipyOptimizer does not maintain internal state between optimization runs,
so this method is a no-op.
"""
[docs]
def get_config(self) -> dict[str, Any]:
"""Get optimizer configuration for checkpoint reconstruction.
Raises:
NotImplementedError: ScipyOptimizer does not support checkpointing.
"""
raise NotImplementedError(
"ScipyOptimizer does not support checkpointing. Please use "
"MonteCarloOptimizer or PymooOptimizer for checkpointing support."
)
[docs]
class MonteCarloOptimizer(Optimizer):
"""
Monte Carlo-based parameter search optimizer.
This optimizer samples parameter space randomly, selects the best-performing
samples, and uses them as centers for the next generation of samples with
decreasing variance. This implements a simple but effective evolutionary strategy.
"""
def __init__(
self,
population_size: int = 10,
n_best_sets: int = 3,
keep_best_params: bool = False,
):
"""
Initialize a Monte Carlo optimizer.
Args:
population_size (int, optional): Size of the population for the algorithm.
Defaults to 10.
n_best_sets (int, optional): Number of top-performing parameter sets to
use as seeds for the next generation. Defaults to 3.
keep_best_params (bool, optional): If True, includes the best parameter sets
directly in the new population. If False, generates all new parameters
by sampling around the best ones. Defaults to False.
Raises:
ValueError: If n_best_sets is greater than population_size.
ValueError: If keep_best_params is True and n_best_sets equals population_size.
"""
super().__init__()
if n_best_sets > population_size:
raise ValueError(
"n_best_sets must be less than or equal to population_size."
)
if keep_best_params and n_best_sets == population_size:
raise ValueError(
"If keep_best_params is True, n_best_sets must be less than population_size."
)
self._population_size = population_size
self._n_best_sets = n_best_sets
self._keep_best_params = keep_best_params
# Optimization state (updated during optimize(), used for checkpointing)
self._curr_population: npt.NDArray[np.float64] | None = None
self._curr_evaluated_population: npt.NDArray[np.float64] | None = None
self._curr_losses: npt.NDArray[np.float64] | None = None
self._curr_iteration: int | None = None
self._curr_rng_state: dict | None = None
@property
def _has_checkpoint(self) -> bool:
return self._curr_population is not None and self._curr_iteration is not None
@property
def population_size(self) -> int:
"""
Get the size of the population.
Returns:
int: Size of the population.
"""
return self._population_size
@property
def n_param_sets(self) -> int:
"""Number of parameter sets (population size), per the Optimizer interface.
Returns:
int: The population size.
"""
return self._population_size
@property
def n_best_sets(self) -> int:
"""
Get the number of best parameter sets used for seeding the next generation.
Returns:
int: Number of best-performing sets kept.
"""
return self._n_best_sets
@property
def keep_best_params(self) -> bool:
"""
Get whether the best parameters are kept in the new population.
Returns:
bool: True if best parameters are included in new population, False otherwise.
"""
return self._keep_best_params
[docs]
def get_config(self) -> dict[str, Any]:
"""Get optimizer configuration for checkpoint reconstruction.
Returns:
dict[str, Any]: Dictionary containing optimizer type and configuration parameters.
"""
return {
"type": "MonteCarloOptimizer",
"population_size": self._population_size,
"n_best_sets": self._n_best_sets,
"keep_best_params": self._keep_best_params,
}
def _compute_new_parameters(
self,
params: npt.NDArray[np.float64],
curr_iteration: int,
best_indices: npt.NDArray[np.intp],
rng: np.random.Generator,
) -> npt.NDArray[np.float64]:
"""
Generates a new population of parameters based on the best-performing ones.
"""
# 1. Select the best parameter sets from the current population
best_params = params[best_indices]
# 2. Determine how many new samples to generate and calculate repeat counts
if self._keep_best_params:
n_new_samples = self._population_size - self._n_best_sets
# Calculate repeat counts for new samples only
samples_per_best = n_new_samples // self._n_best_sets
remainder = n_new_samples % self._n_best_sets
else:
# Calculate repeat counts for the entire population
samples_per_best = self._population_size // self._n_best_sets
remainder = self._population_size % self._n_best_sets
repeat_counts = np.full(self._n_best_sets, samples_per_best)
repeat_counts[:remainder] += 1
# 3. Prepare the means for sampling by repeating each best parameter set
new_means = np.repeat(best_params, repeat_counts, axis=0)
# 4. Define the standard deviation (scale), which shrinks over iterations
scale = 1.0 / (2.0 * (curr_iteration + 1.0))
# 5. Generate new parameters by sampling around the best ones
new_params = rng.normal(loc=new_means, scale=scale)
# 6. Apply periodic boundary conditions
new_params = new_params % (2 * np.pi)
# 7. Conditionally combine with best params if keeping them
if self._keep_best_params:
return np.vstack([best_params, new_params])
else:
return new_params
[docs]
def optimize(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
initial_params: npt.NDArray[np.float64] | None = None,
callback_fn: Callable[[OptimizeResult], Any] | None = None,
**kwargs,
) -> OptimizeResult:
"""Perform Monte Carlo optimization on the cost function.
Parameters:
cost_fn: The cost function to minimize.
initial_params: Initial parameters for the optimization.
callback_fn: Optional callback function to monitor progress.
**kwargs: Additional keyword arguments:
- max_iterations (int, optional): Total desired number of iterations.
When resuming from a checkpoint, this represents the total iterations
desired across all runs. The optimizer will automatically calculate
and run only the remaining iterations needed. Defaults to 5.
- rng (np.random.Generator, optional): Random number generator for
parameter sampling. Defaults to a new generator if not provided.
Returns:
Optimized parameters.
"""
rng = kwargs.pop("rng", np.random.default_rng())
max_iterations = kwargs.pop("max_iterations", 5)
# Resume from checkpoint or initialize fresh
if self._curr_population is not None and self._curr_iteration is not None:
start_iter = self._curr_iteration + 1
rng.bit_generator.state = self._curr_rng_state
# Calculate remaining iterations to reach total desired
iterations_completed = self._curr_iteration + 1
iterations_remaining = max_iterations - iterations_completed
end_iter = start_iter + max(0, iterations_remaining)
else:
if initial_params is None:
raise ValueError(
"initial_params is required for a fresh MonteCarloOptimizer run."
)
self._curr_population = np.copy(initial_params)
start_iter = 0
end_iter = max_iterations
for curr_iter in range(start_iter, end_iter):
# Evaluate the entire population once
self._curr_losses = np.atleast_1d(cost_fn(self._curr_population)).astype(
np.float64
)
self._curr_evaluated_population = np.copy(self._curr_population)
# Find the indices of the best-performing parameter sets
best_indices = np.argpartition(self._curr_losses, self.n_best_sets - 1)[
: self.n_best_sets
]
# Generate the next generation of parameters (uses RNG, so capture state after)
self._curr_population = self._compute_new_parameters(
self._curr_evaluated_population, curr_iter, best_indices, rng
)
self._curr_iteration = curr_iter
self._curr_rng_state = rng.bit_generator.state
if callback_fn:
callback_fn(
OptimizeResult(
x=self._curr_evaluated_population, fun=self._curr_losses
)
)
# Note: 'losses' here are from the last successfully evaluated population
# (either from the loop above, or from checkpoint state if loop didn't run)
if self._curr_losses is None or self._curr_evaluated_population is None:
raise RuntimeError(
"MonteCarloOptimizer.optimize produced no evaluated population; "
"nothing to return."
)
best_idx = np.argmin(self._curr_losses)
# Return the best results from the LAST EVALUATED population
# nit should be the total number of iterations completed
total_iterations_completed = (
self._curr_iteration + 1 if self._curr_iteration is not None else 0
)
return OptimizeResult(
x=self._curr_evaluated_population[best_idx],
fun=self._curr_losses[best_idx],
nit=total_iterations_completed,
)
[docs]
def save_state(self, checkpoint_dir: Path | str) -> None:
"""Save the optimizer's internal state to a checkpoint directory.
Args:
checkpoint_dir (Path | str): Directory path where the optimizer state will be saved.
Raises:
RuntimeError: If optimization has not been run (no state to save).
"""
if (
self._curr_population is None
or self._curr_evaluated_population is None
or self._curr_losses is None
or self._curr_iteration is None
):
raise RuntimeError(
"Cannot save checkpoint: optimization has not been run. "
"At least one iteration must complete before saving optimizer state."
)
checkpoint_path = Path(checkpoint_dir)
checkpoint_path.mkdir(parents=True, exist_ok=True)
state_file = checkpoint_path / OPTIMIZER_STATE_FILE
# RNG state is a dict/tuple structure, pickle it for bytes storage
# Then encode to base64 string for JSON serialization
rng_state_bytes = pickle.dumps(self._curr_rng_state)
rng_state_b64 = base64.b64encode(rng_state_bytes).decode("ascii")
state = MonteCarloState(
population_size=self._population_size,
n_best_sets=self._n_best_sets,
keep_best_params=self._keep_best_params,
curr_iteration=self._curr_iteration,
population=self._curr_population.tolist(),
evaluated_population=self._curr_evaluated_population.tolist(),
losses=self._curr_losses.tolist(),
rng_state_b64=rng_state_b64,
)
_atomic_write(state_file, state.model_dump_json(indent=2))
[docs]
@classmethod
def load_state(cls, checkpoint_dir: Path | str) -> "MonteCarloOptimizer":
"""Load the optimizer's internal state from a checkpoint directory.
Creates a new MonteCarloOptimizer instance with the state restored from the checkpoint.
Args:
checkpoint_dir (Path | str): Directory path where the optimizer state is saved.
Returns:
MonteCarloOptimizer: A new optimizer instance with restored state.
Raises:
FileNotFoundError: If the checkpoint file does not exist.
"""
checkpoint_path = Path(checkpoint_dir)
state_file = checkpoint_path / OPTIMIZER_STATE_FILE
state = _load_and_validate_pydantic_model(
state_file,
MonteCarloState,
required_fields=["population_size", "curr_iteration", "rng_state_b64"],
error_context="Monte Carlo optimizer",
)
# Create new instance with saved configuration
optimizer = cls(
population_size=state.population_size,
n_best_sets=state.n_best_sets,
keep_best_params=state.keep_best_params,
)
# Restore state
optimizer._curr_population = (
np.array(state.population) if state.population else None
)
optimizer._curr_evaluated_population = (
np.array(state.evaluated_population) if state.evaluated_population else None
)
optimizer._curr_losses = np.array(state.losses) if state.losses else None
optimizer._curr_iteration = (
state.curr_iteration if state.curr_iteration != -1 else None
)
# Restore RNG state from base64 string -> bytes -> pickle
rng_state_bytes = base64.b64decode(state.rng_state_b64)
optimizer._curr_rng_state = pickle.loads(rng_state_bytes)
return optimizer
[docs]
def reset(self) -> None:
"""Reset the optimizer's internal state.
Clears all current optimization state (population, losses, iteration, RNG state),
allowing the optimizer to be reused for fresh optimization runs.
"""
self._curr_population = None
self._curr_evaluated_population = None
self._curr_losses = None
self._curr_iteration = None
self._curr_rng_state = None
[docs]
def copy_optimizer(optimizer: Optimizer) -> Optimizer:
"""Create a new optimizer instance with the same configuration as the given optimizer.
This function creates a fresh copy of an optimizer with identical configuration
parameters but with reset internal state. This is useful when multiple programs
need their own optimizer instances to avoid state contamination.
.. tip::
Use this function when preparing a batch of programs that will run in parallel.
Pass a fresh copy of the optimizer to each program instance to ensure thread safety.
Args:
optimizer: The optimizer to copy.
Returns:
A new optimizer instance with the same configuration but fresh state.
Raises:
ValueError: If the optimizer type is not recognized.
"""
if isinstance(optimizer, MonteCarloOptimizer):
return MonteCarloOptimizer(
population_size=optimizer.population_size,
n_best_sets=optimizer.n_best_sets,
keep_best_params=optimizer.keep_best_params,
)
elif isinstance(optimizer, PymooOptimizer):
return PymooOptimizer(
method=optimizer.method,
population_size=optimizer.population_size,
**optimizer.algorithm_kwargs,
)
elif isinstance(optimizer, ScipyOptimizer):
return ScipyOptimizer(method=optimizer.method)
elif isinstance(optimizer, GridSearchOptimizer):
return GridSearchOptimizer(param_grid=optimizer._param_grid.copy())
else:
raise ValueError(f"Unknown optimizer type: {type(optimizer)}")
[docs]
class GridSearchOptimizer(Optimizer):
"""Exhaustive grid search optimizer.
Evaluates all parameter combinations on a user-supplied grid in a
single iteration and returns the best-performing parameters. Designed
for low-dimensional parameter spaces like CE-QAOA (gamma, beta).
"""
def __init__(
self,
param_grid: npt.NDArray[np.float64] | None = None,
*,
param_ranges: list[tuple[float, float]] | None = None,
grid_points: int = 20,
):
"""Initialize a grid search optimizer.
Provide either *param_grid* directly or *param_ranges* + *grid_points*
to auto-generate the grid.
Args:
param_grid: Explicit 2D array of shape ``(n_points, n_params)``
where each row is a parameter combination to evaluate.
param_ranges: List of ``(low, high)`` tuples, one per parameter.
Used with *grid_points* to generate a Cartesian-product grid.
grid_points: Number of grid points per parameter dimension.
Only used with *param_ranges*. Defaults to 20.
Raises:
ValueError: If neither *param_grid* nor *param_ranges* is provided,
or if *param_grid* is not 2D.
"""
super().__init__()
if param_grid is not None:
param_grid = np.asarray(param_grid, dtype=np.float64)
if param_grid.ndim != 2:
raise ValueError(f"param_grid must be 2D, got {param_grid.ndim}D.")
self._param_grid = param_grid
elif param_ranges is not None:
axes = [np.linspace(lo, hi, grid_points) for lo, hi in param_ranges]
mesh = np.meshgrid(*axes, indexing="ij")
self._param_grid = np.column_stack([m.ravel() for m in mesh]).astype(
np.float64
)
else:
raise ValueError("Provide either param_grid or param_ranges.")
self._best_params: npt.NDArray[np.float64] | None = None
self._best_loss: float | None = None
self._all_losses: npt.NDArray[np.float64] | None = None
@property
def n_param_sets(self) -> int:
"""Number of parameter sets (grid size)."""
return len(self._param_grid)
[docs]
def optimize(
self,
cost_fn: Callable[[npt.NDArray[np.float64]], float | npt.NDArray[np.float64]],
initial_params: npt.NDArray[np.float64] | None = None,
callback_fn: Callable[[OptimizeResult], Any] | None = None,
**kwargs,
) -> OptimizeResult:
"""Evaluate all grid points and return the best parameters.
The *initial_params* argument is ignored in favor of the grid.
The optimizer always runs for exactly 1 iteration regardless of
*max_iterations*.
Args:
cost_fn: Cost function accepting a 2D array of parameter sets and
returning an array of losses.
initial_params: Ignored (grid is used instead).
callback_fn: Optional callback with intermediate results.
**kwargs: Accepts but ignores ``max_iterations`` and ``rng``.
Returns:
OptimizeResult with the best parameters.
"""
max_iterations = kwargs.pop("max_iterations", 1)
if max_iterations > 1:
warnings.warn(
f"GridSearchOptimizer evaluates all grid points in a single pass. "
f"max_iterations={max_iterations} will be ignored.",
UserWarning,
stacklevel=2,
)
kwargs.pop("rng", None)
self._all_losses = np.atleast_1d(cost_fn(self._param_grid)).astype(np.float64)
best_idx = np.argmin(self._all_losses)
self._best_params = self._param_grid[best_idx].copy()
self._best_loss = float(self._all_losses[best_idx])
if callback_fn:
callback_fn(OptimizeResult(x=self._param_grid, fun=self._all_losses))
return OptimizeResult(
x=self._best_params,
fun=self._best_loss,
nit=1,
)
[docs]
def get_config(self) -> dict[str, Any]:
"""Get optimizer configuration."""
return {
"type": "GridSearchOptimizer",
"grid_size": len(self._param_grid),
}
[docs]
def save_state(self, checkpoint_dir: Path | str) -> None:
"""Save grid search state (grid + results)."""
checkpoint_path = Path(checkpoint_dir)
checkpoint_path.mkdir(parents=True, exist_ok=True)
state = {
"param_grid": self._param_grid.tolist(),
"all_losses": (
self._all_losses.tolist() if self._all_losses is not None else None
),
"best_params": (
self._best_params.tolist() if self._best_params is not None else None
),
"best_loss": self._best_loss,
}
state_file = checkpoint_path / OPTIMIZER_STATE_FILE
_atomic_write(state_file, json.dumps(state, indent=2))
[docs]
@classmethod
def load_state(cls, checkpoint_dir: Path | str) -> "GridSearchOptimizer":
"""Load grid search state from checkpoint."""
state_file = Path(checkpoint_dir) / OPTIMIZER_STATE_FILE
with open(state_file) as f:
state = json.load(f)
opt = cls(param_grid=np.array(state["param_grid"]))
if state["all_losses"] is not None:
opt._all_losses = np.array(state["all_losses"])
if state["best_params"] is not None:
opt._best_params = np.array(state["best_params"])
opt._best_loss = state["best_loss"]
return opt
[docs]
def reset(self) -> None:
"""Reset optimizer state."""
self._best_params = None
self._best_loss = None
self._all_losses = None