Source code for divi.qprog.problems._routing

# SPDX-FileCopyrightText: 2025-2026 Qoro Quantum Ltd <divi@qoroquantum.de>
#
# SPDX-License-Identifier: Apache-2.0

"""Routing problem classes and utilities for QAOA (TSP, CVRP)."""

import math
import re
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal

import numpy as np
import numpy.typing as npt
from qiskit.quantum_info import SparsePauliOp
from scipy.optimize import linear_sum_assignment

from divi.hamiltonians import qubo_to_ising, x_mixer, xy_mixer
from divi.qprog.algorithms import InitialState, SuperpositionState, WState
from divi.qprog.algorithms._initial_state import build_block_xy_mixer_graph
from divi.qprog.problems import QAOAProblem

# --- TSP utilities ---


[docs] def create_tsp_qubo( cost_matrix: npt.NDArray[np.floating], start_city: int = 0, constraint_penalty: float = 4.0, objective_weight: float = 1.0, reduced: bool = True, ) -> npt.NDArray[np.floating]: """Generate a QUBO for TSP with a fixed start city. Uses the standard node-ordering (assignment) encoding where binary variable ``x_{i,t}`` indicates that city *i* is visited at time step *t*. The start city is fixed, reducing the problem from *n²* to *(n-1)²* qubits. By default the **reduced phase operator** from CE-QAOA (`arXiv:2511.14296 <https://arxiv.org/abs/2511.14296>`_, Eq. 13) is used: row constraints (each time step has exactly one city) are **omitted** because they are enforced structurally by the W-state initialization and XY mixer. Only column constraints and the objective remain. Args: cost_matrix: Symmetric (n × n) distance/cost matrix. start_city: Index of the fixed start/end city. Defaults to 0. constraint_penalty: Penalty strength for constraint violations. objective_weight: Weight for the objective function. reduced: If ``True`` (default), omit row penalties — the correct choice when using CE-QAOA (W-state + XY mixer) since row one-hot constraints are enforced by the ansatz. Set to ``False`` only if you are using this QUBO with a **standard QAOA** ansatz (Hadamard init + X mixer) or any other method that does **not** structurally enforce row constraints. In that case both row and column penalties are included so the Hamiltonian alone encodes feasibility. Returns: Upper-triangular QUBO matrix of shape ``(m², m²)`` where ``m = n - 1``, compatible with ``QUBOProblemTypes``. Raises: ValueError: If cost_matrix is not square or start_city is out of range. """ n = cost_matrix.shape[0] if cost_matrix.shape != (n, n): raise ValueError(f"cost_matrix must be square, got shape {cost_matrix.shape}.") if not (0 <= start_city < n): raise ValueError(f"start_city {start_city} out of range [0, {n}).") m = n - 1 # reduced dimension (cities excluding start) cities = [c for c in range(n) if c != start_city] # Variable ordering: x_{i,t} -> qubit index i*m + t I_m = np.eye(m) J_m = np.ones((m, m)) if reduced: # Row penalties OMITTED — enforced by W-state + XY mixer (CE-QAOA). Q = np.zeros((m * m, m * m)) else: # Row penalties: A * sum_t (1 - sum_i x_{i,t})^2 Q = constraint_penalty * np.kron(J_m - I_m, I_m) # --- Column penalties: A * sum_i (1 - sum_t x_{i,t})^2 --- Q += constraint_penalty * np.kron(I_m, J_m - I_m) # Linear term: each variable appears in column penalty (and row if not reduced) n_penalties = 1 if reduced else 2 np.fill_diagonal(Q, np.diag(Q) - n_penalties * constraint_penalty) # --- Objective: consecutive timeslot costs --- W_reduced = cost_matrix[np.ix_(cities, cities)] S = np.zeros((m, m)) for t in range(m - 1): S[t, t + 1] = 1.0 Q += objective_weight * np.kron(W_reduced, S) # --- Depot edges (linear terms) --- depot_to_first = cost_matrix[start_city, cities] for j in range(m): Q[j * m, j * m] += objective_weight * depot_to_first[j] last_to_depot = cost_matrix[cities, start_city] for i in range(m): Q[i * m + (m - 1), i * m + (m - 1)] += objective_weight * last_to_depot[i] # Return upper-triangular matrix return np.triu(Q + Q.T - np.diag(np.diag(Q)))
[docs] def is_valid_tsp_tour(bitstring: str, n_cities: int) -> bool: """Check if a bitstring represents a valid TSP tour. A valid tour has the assignment matrix (reshaped from the bitstring) as a permutation matrix: every row and column sums to exactly 1. Args: bitstring: Binary string of length ``(n_cities - 1)²``. n_cities: Total number of cities (including the fixed start city). Returns: True if the bitstring encodes a valid permutation. """ m = n_cities - 1 if len(bitstring) != m * m: return False mat = np.array([int(b) for b in bitstring]).reshape(m, m) return bool(np.all(mat.sum(axis=0) == 1) and np.all(mat.sum(axis=1) == 1))
def decode_tsp_solution( bitstring: str, n_cities: int, start_city: int = 0, ) -> list[int] | None: """Decode a bitstring into a city tour. Args: bitstring: Binary string of length ``(n_cities - 1)²``. n_cities: Total number of cities. start_city: Index of the fixed start/end city. Returns: Ordered list of city indices forming the tour (starting and ending at *start_city*), or None if infeasible. """ if not is_valid_tsp_tour(bitstring, n_cities): return None m = n_cities - 1 cities = [c for c in range(n_cities) if c != start_city] mat = np.array([int(b) for b in bitstring]).reshape(m, m) tour = [start_city] for t in range(m): city_idx = int(np.argmax(mat[:, t])) tour.append(cities[city_idx]) tour.append(start_city) return tour
[docs] def tour_cost( tour: list[int], cost_matrix: npt.NDArray[np.floating], ) -> float: """Compute the total travel cost of a tour. Args: tour: Ordered list of city indices (first and last should match for a round trip). cost_matrix: Distance/cost matrix. Returns: Total tour cost. """ total = 0.0 for i in range(len(tour) - 1): total += cost_matrix[tour[i], tour[i + 1]] return float(total)
def repair_tsp_solution( bitstring: str, n_cities: int, start_city: int, cost_matrix: npt.NDArray[np.floating], ) -> tuple[str, list[int], float]: """Repair an infeasible bitstring to the nearest valid permutation. Uses the Hungarian algorithm (``scipy.optimize.linear_sum_assignment``) to find the permutation matrix closest in Hamming distance to the (possibly infeasible) assignment matrix. If the bitstring is already feasible, it is returned unchanged. Args: bitstring: Binary string of length ``(n_cities - 1)²``. n_cities: Total number of cities. start_city: Index of the fixed start/end city. cost_matrix: Distance/cost matrix (used for cost evaluation). Returns: Tuple of (repaired_bitstring, tour, cost). """ m = n_cities - 1 cities = [c for c in range(n_cities) if c != start_city] # Parse the bitstring into a (soft) assignment matrix mat = np.array([int(b) for b in bitstring], dtype=np.float64).reshape(m, m) # Hungarian algorithm minimises cost; we want the assignment closest # to the soft matrix, so we use (1 - mat) as the cost matrix # (maximise overlap with the measured matrix). row_ind, col_ind = linear_sum_assignment(1.0 - mat) # Build the repaired permutation matrix repaired = np.zeros((m, m), dtype=int) repaired[row_ind, col_ind] = 1 repaired_bitstring = "".join(str(x) for x in repaired.flatten()) # Decode tour tour = [start_city] for t in range(m): city_idx = int(np.argmax(repaired[:, t])) tour.append(cities[city_idx]) tour.append(start_city) cost = tour_cost(tour, cost_matrix) return repaired_bitstring, tour, cost # --- CVRP utilities (one-hot) --- def create_cvrp_qubo( cost_matrix: npt.NDArray[np.floating], demands: npt.NDArray[np.floating], capacity: float, n_vehicles: int, depot: int = 0, constraint_penalty: float = 4.0, objective_weight: float = 1.0, capacity_penalty: float = 4.0, ) -> npt.NDArray[np.floating]: """Generate a QUBO for CVRP with a fixed depot. Uses one-hot block encoding where each vehicle *v* has a block of ``n_customers × max_steps`` qubits. The depot is fixed and not encoded. Constraints enforced via init + mixer (row one-hot per vehicle block): - Each vehicle visits at most one customer per time step. Constraints in the phase operator (penalties): - **Column one-hot** (constraint_penalty): each customer visited exactly once across all vehicles. - **Capacity** (capacity_penalty): each vehicle route does not exceed capacity. - **Objective** (objective_weight): minimise total travel distance. Variable ordering: ``x_{v,i,t}`` → qubit ``v * K * T + t * K + i`` where K = n_customers, T = max_steps = K. The QUBO matrix is built via Kronecker products and vectorised numpy operations. Args: cost_matrix: Distance matrix of shape ``(n_nodes, n_nodes)``. demands: Customer demands, shape ``(n_nodes,)``. Depot demand should be 0. capacity: Vehicle capacity. n_vehicles: Number of vehicles. depot: Index of the depot node. Defaults to 0. constraint_penalty: Penalty for customer-visit constraints. objective_weight: Weight for the objective function. capacity_penalty: Penalty for capacity constraint violations. Returns: Upper-triangular QUBO matrix compatible with ``QUBOProblemTypes``. Raises: ValueError: If inputs are inconsistent. """ n_nodes = cost_matrix.shape[0] if cost_matrix.shape != (n_nodes, n_nodes): raise ValueError(f"cost_matrix must be square, got shape {cost_matrix.shape}.") if len(demands) != n_nodes: raise ValueError( f"demands length ({len(demands)}) must match " f"cost_matrix size ({n_nodes})." ) customers = [c for c in range(n_nodes) if c != depot] K = len(customers) # n_customers V = n_vehicles T = K # max_steps = n_customers (worst case) N = V * K * T # total qubits # Variable ordering: x_{v,i,t} -> qubit v*K*T + t*K + i # Reshape views use (V, T, K) -> flatten to N I_K = np.eye(K) J_T = np.ones((T, T)) I_V = np.eye(V) J_V = np.ones((V, V)) Q = np.zeros((N, N)) # --- Customer visit constraints --- # For each customer i: (sum_{v,t} x_{v,i,t} - 1)^2 # The sum runs over all (v,t) pairs for a fixed customer i. # Off-diagonal: 2A between all pairs of qubits sharing the same customer i # across different (v,t) slots. # In Kronecker form with ordering (V, T, K): # same-customer coupling = I_K, all (v,t) pairs = J_V ⊗ J_T # off-diagonal part: (J_V ⊗ J_T - I_{VT}) ⊗ I_K (exclude self-pairs) visit_offdiag = np.kron(np.kron(J_V, J_T) - np.eye(V * T), I_K) Q += constraint_penalty * visit_offdiag # Linear: -A per variable (from -2*x + x^2 = -x per constraint) # Each variable participates in exactly one customer constraint np.fill_diagonal(Q, np.diag(Q) - constraint_penalty) # --- Capacity constraints --- # For each vehicle v: (sum_{i,t} d_i * x_{v,i,t} - C)^2 # = sum_{i,t} sum_{j,s} d_i*d_j * x_{v,i,t} * x_{v,j,s} # - 2C * sum_{i,t} d_i * x_{v,i,t} + C^2 # # The quadratic part for a single vehicle is: # d ⊗ d^T ⊗ J_T (all timestep pairs, weighted by demand products) # where d is the customer demand vector. d = demands[customers] dd = np.outer(d, d) # K×K demand product matrix # Per-vehicle capacity quadratic: (J_T ⊗ dd) for each vehicle # Across vehicles: block-diagonal (I_V) cap_quad_per_vehicle = np.kron(J_T, dd) # (KT × KT) Q += capacity_penalty * np.kron(I_V, cap_quad_per_vehicle) # Capacity linear: -2C * d_i per variable (+ d_i^2 from diagonal of quad) # The d_i^2 is already in the diagonal of cap_quad. # We need: -2C * d_i additionally on the diagonal. # d_i for each qubit: tile d across all (V, T) slots d_per_qubit = np.tile(d, V * T) np.fill_diagonal(Q, np.diag(Q) - 2.0 * capacity_penalty * capacity * d_per_qubit) # --- Objective: travel cost --- W = cost_matrix[np.ix_(customers, customers)] # K×K reduced cost matrix # Consecutive steps: x_{v,i,t} * x_{v,j,t+1} with weight W[i,j] # Shift matrix S[t, t+1] = 1 S = np.zeros((T, T)) for t in range(T - 1): S[t, t + 1] = 1.0 # Per-vehicle: W ⊗ S (customer-to-customer across consecutive timesteps) # Across vehicles: block-diagonal (I_V) obj_consecutive = np.kron(S, W) # (KT × KT) Q += objective_weight * np.kron(I_V, obj_consecutive) # Depot edges (linear terms) depot_to_first = cost_matrix[depot, customers] # K vector last_to_depot = cost_matrix[customers, depot] # K vector for v in range(V): base = v * K * T # depot -> first timestep (t=0): weight on x_{v,i,0} for i in range(K): Q[base + i, base + i] += objective_weight * depot_to_first[i] # last timestep (t=T-1) -> depot: weight on x_{v,i,T-1} for i in range(K): q = base + (T - 1) * K + i Q[q, q] += objective_weight * last_to_depot[i] # Return upper-triangular matrix return np.triu(Q + Q.T - np.diag(np.diag(Q)))
[docs] def cvrp_block_structure( n_customers: int, n_vehicles: int, ) -> tuple[int, int]: """Return the block structure for CVRP CE-QAOA. Args: n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. Returns: tuple[int, int]: Pair ``(block_size, n_blocks)`` where ``block_size = n_customers`` (one qubit per customer per time step) and ``n_blocks = n_vehicles * n_customers`` (one block per vehicle × time step). """ max_steps = n_customers return n_customers, n_vehicles * max_steps
def is_valid_cvrp_solution( bitstring: str, n_customers: int, n_vehicles: int, demands: npt.NDArray[np.floating], capacity: float, depot: int = 0, ) -> bool: """Check if a bitstring represents a valid CVRP solution. Validates: - Each customer is visited exactly once across all vehicles. - Each vehicle's total demand does not exceed capacity. Args: bitstring: Binary string. n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. demands: Customer demands (length = total nodes including depot). capacity: Vehicle capacity. depot: Depot index. Returns: True if the solution is feasible. """ max_steps = n_customers expected_len = n_vehicles * n_customers * max_steps if len(bitstring) != expected_len: return False customers = [c for c in range(len(demands)) if c != depot] bits = np.array([int(b) for b in bitstring]).reshape( n_vehicles, max_steps, n_customers ) # Check each customer visited exactly once total_visits = bits.sum(axis=(0, 1)) if not np.all(total_visits == 1): return False # Check capacity per vehicle for v in range(n_vehicles): vehicle_demand = 0.0 for t in range(max_steps): for i in range(n_customers): if bits[v, t, i] == 1: vehicle_demand += demands[customers[i]] if vehicle_demand > capacity + 1e-9: return False return True def decode_cvrp_solution( bitstring: str, n_customers: int, n_vehicles: int, depot: int = 0, n_nodes: int | None = None, ) -> list[list[int]] | None: """Decode a bitstring into vehicle routes. Args: bitstring: Binary string. n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. depot: Depot index. n_nodes: Total number of nodes (if None, inferred as n_customers + 1). Returns: List of routes, where each route is ``[depot, c1, c2, ..., depot]``, or None if the bitstring cannot be decoded. """ if n_nodes is None: n_nodes = n_customers + 1 max_steps = n_customers expected_len = n_vehicles * n_customers * max_steps if len(bitstring) != expected_len: return None customers = [c for c in range(n_nodes) if c != depot] bits = np.array([int(b) for b in bitstring]).reshape( n_vehicles, max_steps, n_customers ) routes = [] for v in range(n_vehicles): route = [depot] for t in range(max_steps): assigned = np.where(bits[v, t] == 1)[0] if len(assigned) == 1: route.append(customers[assigned[0]]) route.append(depot) routes.append(route) return routes def repair_cvrp_solution( bitstring: str, n_customers: int, n_vehicles: int, cost_matrix: npt.NDArray[np.floating], demands: npt.NDArray[np.floating], capacity: float, depot: int = 0, ) -> tuple[str, list[list[int]], float]: """Repair an infeasible CVRP bitstring. Uses a greedy assignment strategy: 1. Parse the soft assignment matrix. 2. Use the Hungarian algorithm on a cost matrix derived from the soft assignments to find the best customer-to-(vehicle, timestep) mapping. 3. Verify capacity constraints; reassign overflows greedily. Args: bitstring: Binary string. n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. cost_matrix: Distance matrix. demands: Customer demands. capacity: Vehicle capacity. depot: Depot index. Returns: Tuple of (repaired_bitstring, routes, total_cost). """ max_steps = n_customers n_nodes = len(demands) customers = [c for c in range(n_nodes) if c != depot] bits = np.array([int(b) for b in bitstring], dtype=np.float64).reshape( n_vehicles * max_steps, n_customers ) # Use Hungarian algorithm to find best assignment of customers # to (vehicle, timestep) slots # Cost matrix: 1 - soft_assignment (to maximise overlap) row_ind, col_ind = linear_sum_assignment(1.0 - bits) # Build repaired assignment repaired = np.zeros((n_vehicles, max_steps, n_customers), dtype=int) slot_to_customer = {} for slot, cust in zip(row_ind, col_ind): if slot < n_vehicles * max_steps and cust < n_customers: v = slot // max_steps t = slot % max_steps repaired[v, t, cust] = 1 slot_to_customer[(v, t)] = cust # Check capacity and greedily reassign overflows for v in range(n_vehicles): vehicle_demand = 0.0 for t in range(max_steps): assigned = np.where(repaired[v, t] == 1)[0] for cust_idx in assigned: cust_demand = demands[customers[cust_idx]] if vehicle_demand + cust_demand > capacity + 1e-9: # Try to move to another vehicle with remaining capacity repaired[v, t, cust_idx] = 0 placed = False for v2 in range(n_vehicles): if v2 == v: continue v2_demand = sum( demands[customers[i]] for t2 in range(max_steps) for i in range(n_customers) if repaired[v2, t2, i] == 1 ) if v2_demand + cust_demand <= capacity + 1e-9: # Find empty slot for t2 in range(max_steps): if repaired[v2, t2].sum() == 0: repaired[v2, t2, cust_idx] = 1 placed = True break if placed: break if not placed: # Last resort: put it back repaired[v, t, cust_idx] = 1 vehicle_demand += cust_demand else: vehicle_demand += cust_demand repaired_bitstring = "".join(str(x) for x in repaired.flatten()) routes = decode_cvrp_solution( repaired_bitstring, n_customers, n_vehicles, depot, n_nodes ) # Compute total cost total_cost = 0.0 if routes is not None: for route in routes: for i in range(len(route) - 1): total_cost += cost_matrix[route[i], route[i + 1]] return repaired_bitstring, routes or [], float(total_cost) # --- CVRP utilities (binary encoding) ---
[docs] @dataclass(frozen=True) class BinaryBlockConfig: """Configuration for binary-encoded CE-QAOA blocks. Attributes: n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. max_steps: Maximum route length per vehicle. bits_per_slot: Qubits per slot (ceil(log₂(n_customers + 1))). n_slots: Total number of (vehicle, timestep) slots. n_qubits: Total qubit count. """ n_customers: int n_vehicles: int max_steps: int bits_per_slot: int n_slots: int n_qubits: int
[docs] def binary_block_config( n_customers: int, n_vehicles: int, max_steps: int | None = None, ) -> BinaryBlockConfig: """Compute the binary encoding layout. Args: n_customers: Number of customers (excluding depot). n_vehicles: Number of vehicles. max_steps: Maximum route length per vehicle. Defaults to n_customers (worst case: all customers on one vehicle). Returns: :class:`~divi.qprog.problems.BinaryBlockConfig` with the encoding parameters. """ if max_steps is None: max_steps = n_customers # Need to encode values 0..n_customers (0 = empty slot) bits_per_slot = math.ceil(math.log2(n_customers + 1)) if n_customers > 0 else 1 n_slots = n_vehicles * max_steps n_qubits = n_slots * bits_per_slot return BinaryBlockConfig( n_customers=n_customers, n_vehicles=n_vehicles, max_steps=max_steps, bits_per_slot=bits_per_slot, n_slots=n_slots, n_qubits=n_qubits, )
def create_cvrp_hubo_binary( cost_matrix: npt.NDArray[np.floating], demands: npt.NDArray[np.floating], capacity: float, n_vehicles: int, depot: int = 0, constraint_penalty: float = 4.0, objective_weight: float = 1.0, capacity_penalty: float = 4.0, max_steps: int | None = None, ) -> tuple[dict[tuple[int, ...], float], BinaryBlockConfig]: """Generate a HUBO (Higher-Order Binary Optimization) for CVRP with binary encoding. Each (vehicle, timestep) slot is encoded as a ``bits_per_slot``-bit integer representing which customer (1..N) is assigned, or 0 for an empty slot. The variable ``x_{s,b}`` denotes bit *b* of slot *s*. The integer value of slot *s* is ``sum_b x_{s,b} * 2^b``. Constraints: - **Customer visit**: each customer j in 1..N appears exactly once across all slots. - **Capacity**: each vehicle's route total demand ≤ capacity. - **Objective**: minimise total travel distance. Note: This produces a HUBO (terms up to degree ``bits_per_slot``) which requires quadratization before use in standard QAOA. Args: cost_matrix: Distance matrix, shape ``(n_nodes, n_nodes)``. demands: Node demands, shape ``(n_nodes,)``. Depot demand = 0. capacity: Vehicle capacity. n_vehicles: Number of vehicles. depot: Depot node index. constraint_penalty: Customer-visit constraint penalty. objective_weight: Objective weight. capacity_penalty: Capacity penalty. max_steps: Max route steps per vehicle (default: n_customers). Returns: Tuple of (hubo_dict, config) where hubo_dict maps tuples of variable indices to coefficients. """ n_nodes = cost_matrix.shape[0] customers = [c for c in range(n_nodes) if c != depot] n_cust = len(customers) config = binary_block_config(n_cust, n_vehicles, max_steps) B = config.bits_per_slot def slot_bits(slot_idx: int) -> list[int]: """Return qubit indices for a given slot.""" start = slot_idx * B return list(range(start, start + B)) def vehicle_slot(v: int, t: int) -> int: """Map (vehicle, timestep) to slot index.""" return v * config.max_steps + t hubo: dict[tuple[int, ...], float] = {} def _add(key: tuple[int, ...], val: float) -> None: key = tuple(sorted(key)) hubo[key] = hubo.get(key, 0.0) + val # Helper: build the "indicator" polynomial for slot s == customer j # In binary: slot value = sum_b x_{s,b} * 2^b # Indicator I(s = j) = product over bits b of: # x_{s,b} if bit b of j is 1 # (1 - x_{s,b}) if bit b of j is 0 # # This is a multilinear polynomial of degree B in the slot's bits. # We expand it into a dict of {frozen_set_of_variables: coefficient}. def indicator_terms( slot_idx: int, customer_val: int ) -> dict[frozenset[int], float]: """Expand the indicator I(slot == customer_val) into polynomial terms. Returns dict mapping frozensets of variable indices to coefficients. """ bits = slot_bits(slot_idx) # Start with constant 1 terms: dict[frozenset[int], float] = {frozenset(): 1.0} for b in range(B): bit_is_set = (customer_val >> b) & 1 new_terms: dict[frozenset[int], float] = {} for vars_set, coeff in terms.items(): if bit_is_set: # Multiply by x_{bits[b]} new_vars = vars_set | {bits[b]} new_terms[new_vars] = new_terms.get(new_vars, 0.0) + coeff else: # Multiply by (1 - x_{bits[b]}) # = coeff * 1 - coeff * x_{bits[b]} new_terms[vars_set] = new_terms.get(vars_set, 0.0) + coeff new_vars = vars_set | {bits[b]} new_terms[new_vars] = new_terms.get(new_vars, 0.0) - coeff terms = new_terms # Remove near-zero terms return {k: v for k, v in terms.items() if abs(v) > 1e-15} # --- Customer visit constraints --- # For each customer j: (sum_s I(s == j) - 1)^2 # = sum_s I(s==j) * sum_s' I(s'==j) - 2 * sum_s I(s==j) + 1 for cust_idx in range(n_cust): cust_val = cust_idx + 1 # binary value (0 is "empty") slot_indicators = [] for v in range(n_vehicles): for t in range(config.max_steps): s = vehicle_slot(v, t) ind = indicator_terms(s, cust_val) slot_indicators.append(ind) # -2 * sum_s I(s==j) (linear part of (sum-1)^2) for ind in slot_indicators: for vars_set, coeff in ind.items(): _add( tuple(sorted(vars_set)), -2.0 * constraint_penalty * coeff, ) # sum_s sum_{s'} I(s==j) * I(s'==j) (quadratic part) # = sum_s I(s==j)^2 + 2 * sum_{s<s'} I(s==j)*I(s'==j) # Since I^2 = I (indicator is 0 or 1), the diagonal is just sum_s I(s==j) for ind in slot_indicators: for vars_set, coeff in ind.items(): _add(tuple(sorted(vars_set)), constraint_penalty * coeff) # Cross terms: 2 * sum_{s<s'} I(s==j) * I(s'==j) for i in range(len(slot_indicators)): for j_idx in range(i + 1, len(slot_indicators)): for vars_i, coeff_i in slot_indicators[i].items(): for vars_j, coeff_j in slot_indicators[j_idx].items(): combined = vars_i | vars_j _add( tuple(sorted(combined)), 2.0 * constraint_penalty * coeff_i * coeff_j, ) # --- Objective: travel distance --- # For each vehicle v: # depot -> slot(v,0) + slot(v,t) -> slot(v,t+1) + slot(v,last) -> depot for v in range(n_vehicles): ms = config.max_steps for t in range(ms): s = vehicle_slot(v, t) for cust_idx in range(n_cust): cust_val = cust_idx + 1 cust_node = customers[cust_idx] ind = indicator_terms(s, cust_val) if t == 0: # Depot to first customer weight = cost_matrix[depot, cust_node] for vars_set, coeff in ind.items(): _add( tuple(sorted(vars_set)), objective_weight * weight * coeff, ) if t == ms - 1: # Last customer to depot weight = cost_matrix[cust_node, depot] for vars_set, coeff in ind.items(): _add( tuple(sorted(vars_set)), objective_weight * weight * coeff, ) # Consecutive: slot(v,t) -> slot(v,t+1) if t < ms - 1: s_next = vehicle_slot(v, t + 1) for ci in range(n_cust): ci_val = ci + 1 ci_node = customers[ci] ind_i = indicator_terms(s, ci_val) for cj in range(n_cust): cj_val = cj + 1 cj_node = customers[cj] weight = cost_matrix[ci_node, cj_node] if abs(weight) < 1e-15: continue ind_j = indicator_terms(s_next, cj_val) for vars_i, coeff_i in ind_i.items(): for vars_j, coeff_j in ind_j.items(): combined = vars_i | vars_j _add( tuple(sorted(combined)), objective_weight * weight * coeff_i * coeff_j, ) # --- Capacity constraints --- # For each vehicle v: (sum_{t,j} demand_j * I(slot(v,t)==j) - C)^2 # Simplified: capacity_penalty * (sum - C)^2 # We only add the quadratic expansion terms (linear + cross) for v in range(n_vehicles): # Collect all demand-weighted indicator terms for this vehicle demand_terms: list[tuple[dict[frozenset[int], float], float]] = [] for t in range(config.max_steps): s = vehicle_slot(v, t) for ci in range(n_cust): ci_val = ci + 1 d = demands[customers[ci]] if abs(d) < 1e-15: continue ind = indicator_terms(s, ci_val) demand_terms.append((ind, d)) # -2C * sum of demand_j * I(slot==j) for ind, d in demand_terms: for vars_set, coeff in ind.items(): _add( tuple(sorted(vars_set)), capacity_penalty * d * (-2.0 * capacity) * coeff, ) # (sum demand_j * I(slot==j))^2 # = sum_i d_i^2 * I_i + 2 * sum_{i<j} d_i * d_j * I_i * I_j for i, (ind_i, d_i) in enumerate(demand_terms): # Diagonal: d_i^2 * I_i (since I_i^2 = I_i) for vars_set, coeff in ind_i.items(): _add( tuple(sorted(vars_set)), capacity_penalty * d_i * d_i * coeff, ) # Cross terms for j_idx in range(i + 1, len(demand_terms)): ind_j, d_j = demand_terms[j_idx] for vars_i, coeff_i in ind_i.items(): for vars_j, coeff_j in ind_j.items(): combined = vars_i | vars_j _add( tuple(sorted(combined)), 2.0 * capacity_penalty * d_i * d_j * coeff_i * coeff_j, ) return hubo, config def create_tsp_hubo_binary( cost_matrix: npt.NDArray[np.floating], start_city: int = 0, constraint_penalty: float = 4.0, objective_weight: float = 1.0, ) -> tuple[dict[tuple[int, ...], float], BinaryBlockConfig]: """Generate a binary-encoded HUBO for TSP. TSP is the K=1 special case of CVRP with no demands and ``max_steps`` locked to ``n_customers``. The customer-visit penalty alone already rejects any configuration with fewer than ``n_customers`` customer slots, but when ``n_customers + 1`` is not a power of 2 some bit patterns decode to values outside ``[0, n_customers]``. We add an explicit penalty on those out-of-range patterns so the Hamiltonian landscape directly reflects the feasibility constraint, instead of relying on a counting argument. Args: cost_matrix: Symmetric distance matrix, shape ``(n, n)``. start_city: Index of the fixed start/end city. constraint_penalty: Customer-visit + slot-validity penalty strength. objective_weight: Tour-length weight. Returns: Tuple of (hubo_dict, config) — same shape as :func:`create_cvrp_hubo_binary` but with ``n_vehicles=1`` and ``max_steps=n_customers``. """ n_nodes = cost_matrix.shape[0] n_cust = n_nodes - 1 hubo, config = create_cvrp_hubo_binary( cost_matrix=cost_matrix, demands=np.zeros(n_nodes, dtype=np.float64), capacity=1.0, # any value; demands=0 makes capacity terms vacuous. n_vehicles=1, depot=start_city, constraint_penalty=constraint_penalty, objective_weight=objective_weight, capacity_penalty=0.0, max_steps=n_cust, ) # Slot-validity penalty: for each slot s and each value v in # ``(n_cust + 1, ..., 2^B - 1)``, add ``constraint_penalty * I(s == v)`` # to the HUBO. Skips when 2^B == n_cust + 1 (no out-of-range values). B = config.bits_per_slot invalid_values = range(n_cust + 1, 1 << B) if invalid_values: for slot in range(config.n_slots): slot_bits = list(range(slot * B, slot * B + B)) for v in invalid_values: # Indicator polynomial I(slot == v) expanded over bits. terms: dict[frozenset[int], float] = {frozenset(): 1.0} for b in range(B): bit_set = (v >> b) & 1 new_terms: dict[frozenset[int], float] = {} for vars_set, coeff in terms.items(): if bit_set: key = vars_set | {slot_bits[b]} new_terms[key] = new_terms.get(key, 0.0) + coeff else: new_terms[vars_set] = new_terms.get(vars_set, 0.0) + coeff key = vars_set | {slot_bits[b]} new_terms[key] = new_terms.get(key, 0.0) - coeff terms = new_terms for vars_set, coeff in terms.items(): if abs(coeff) < 1e-15: continue key = tuple(sorted(vars_set)) hubo[key] = hubo.get(key, 0.0) + constraint_penalty * coeff return hubo, config def decode_binary_tsp_solution( bitstring: str, config: BinaryBlockConfig, n_nodes: int, start_city: int = 0, ) -> list[int] | None: """Decode a binary-encoded TSP bitstring into a tour. Returns ``[start_city, c1, ..., c_{n-1}, start_city]`` if the bitstring decodes to a valid TSP tour (all customers, each exactly once, no empty slot), else ``None``. """ routes = decode_binary_cvrp_solution(bitstring, config, start_city, n_nodes) if routes is None or len(routes) != 1: return None tour = routes[0] # decode_binary_cvrp_solution prepends/appends depot; expect depot + n_cust customers + depot. if len(tour) != n_nodes + 1: return None if len(set(tour[1:-1])) != n_nodes - 1: return None return tour def is_valid_binary_tsp( bitstring: str, config: BinaryBlockConfig, n_nodes: int, start_city: int = 0 ) -> bool: return ( decode_binary_tsp_solution(bitstring, config, n_nodes, start_city) is not None ) def decode_binary_cvrp_solution( bitstring: str, config: BinaryBlockConfig, depot: int = 0, n_nodes: int | None = None, ) -> list[list[int]] | None: """Decode a binary-encoded CVRP bitstring into vehicle routes. Args: bitstring: Binary string of length ``config.n_qubits``. config: Binary block configuration. depot: Depot node index. n_nodes: Total number of nodes. Returns: List of routes or None if decoding fails. """ if n_nodes is None: n_nodes = config.n_customers + 1 if len(bitstring) != config.n_qubits: return None B = config.bits_per_slot customers = [c for c in range(n_nodes) if c != depot] routes = [] for v in range(config.n_vehicles): route = [depot] for t in range(config.max_steps): slot_idx = v * config.max_steps + t start = slot_idx * B slot_bits = bitstring[start : start + B] # Convert binary to integer (little-endian: bit 0 is LSB) val = sum(int(slot_bits[b]) * (2**b) for b in range(B)) if 1 <= val <= config.n_customers: route.append(customers[val - 1]) route.append(depot) routes.append(route) return routes def is_valid_binary_cvrp( bitstring: str, config: BinaryBlockConfig, demands: npt.NDArray[np.floating], capacity: float, depot: int = 0, ) -> bool: """Check if a binary-encoded CVRP solution is feasible. Validates: - Each customer appears exactly once across all slots. - No slot has an out-of-range value (> n_customers). - Each vehicle's route demand ≤ capacity. Args: bitstring: Binary string. config: Binary block configuration. demands: Node demands. capacity: Vehicle capacity. depot: Depot index. Returns: True if feasible. """ if len(bitstring) != config.n_qubits: return False B = config.bits_per_slot n_nodes = len(demands) customers = [c for c in range(n_nodes) if c != depot] # Track which customers are visited visited = set() vehicle_demands = [0.0] * config.n_vehicles for v in range(config.n_vehicles): for t in range(config.max_steps): slot_idx = v * config.max_steps + t start = slot_idx * B slot_bits = bitstring[start : start + B] val = sum(int(slot_bits[b]) * (2**b) for b in range(B)) if val == 0: continue # empty slot if val > config.n_customers: return False # out of range cust_node = customers[val - 1] if val in visited: return False # duplicate visit visited.add(val) vehicle_demands[v] += demands[cust_node] # All customers must be visited if len(visited) != config.n_customers: return False # Capacity check for v in range(config.n_vehicles): if vehicle_demands[v] > capacity + 1e-9: return False return True # --- VRP file parser ---
[docs] @dataclass class RoutingInstance: """Parsed VRP/TSP instance data. Attributes: name: Instance name from the file header. comment: Instance comment (may contain optimal cost). problem_type: ``"CVRP"`` or ``"TSP"``. dimension: Total number of nodes (including depot). capacity: Vehicle capacity (CVRP only, 0 for TSP). n_vehicles: Number of vehicles (extracted from name ``-kN`` pattern or from comment, 1 for TSP). coords: Node coordinates, shape ``(dimension, 2)``. demands: Node demands, shape ``(dimension,)``. Depot demand is 0. depot: Depot node index (0-based). cost_matrix: Euclidean distance matrix, shape ``(dimension, dimension)``. optimal_cost: Optimal cost from comment, if available. """ name: str = "" comment: str = "" problem_type: str = "CVRP" dimension: int = 0 capacity: int = 0 n_vehicles: int = 1 coords: npt.NDArray[np.float64] = field(default_factory=lambda: np.array([])) demands: npt.NDArray[np.float64] = field(default_factory=lambda: np.array([])) depot: int = 0 cost_matrix: npt.NDArray[np.float64] = field(default_factory=lambda: np.array([])) optimal_cost: float | None = None @property def n_customers(self) -> int: """Number of customers (excluding depot).""" return self.dimension - 1
_SUPPORTED_EWT = {"EUC_2D", "GEO", "EXPLICIT"} _SUPPORTED_EXPLICIT_FORMATS = { "LOWER_DIAG_ROW", "LOWER_ROW", "UPPER_ROW", "UPPER_DIAG_ROW", "FULL_MATRIX", } # TSPLIB's C reference uses PI = 3.141592 rather than math.pi. Mismatching # the constant shifts integer-truncated GEO distances by ±1 against published # benchmarks, so we mirror the reference exactly. _TSPLIB_PI = 3.141592 def _nint(x: float) -> int: """TSPLIB's nint: round half-away-from-zero (NOT Python's banker's round).""" return int(x + 0.5) if x >= 0 else -int(-x + 0.5) def _euc2d_matrix(coords: list[tuple[float, float]]) -> npt.NDArray[np.float64]: n = len(coords) cost = np.zeros((n, n), dtype=np.float64) for i in range(n): for j in range(i + 1, n): dx = coords[i][0] - coords[j][0] dy = coords[i][1] - coords[j][1] d = _nint(math.sqrt(dx * dx + dy * dy)) cost[i, j] = cost[j, i] = d return cost def _geo_matrix(coords: list[tuple[float, float]]) -> npt.NDArray[np.float64]: """TSPLIB GEO: spherical distance using the radius from the reference spec. Mirrors the C reference exactly: ``PI = 3.141592``, the (1±q1)/(1∓q1) cosine identity, integer-truncated `(int)(R*acos(...) + 1.0)`. The ``acos`` argument is clamped to ``[-1, 1]`` to absorb 1-ulp drift from the trig identity; without it, near-identical points can raise. """ # TSPLIB encodes coords as DDD.MM where MM is minutes — convert to radians. def to_rad(x: float) -> float: deg = int(x) minutes = x - deg return _TSPLIB_PI * (deg + 5.0 * minutes / 3.0) / 180.0 rad = [(to_rad(lat), to_rad(lon)) for lat, lon in coords] n = len(coords) R = 6378.388 cost = np.zeros((n, n), dtype=np.float64) for i in range(n): for j in range(i + 1, n): q1 = math.cos(rad[i][1] - rad[j][1]) q2 = math.cos(rad[i][0] - rad[j][0]) q3 = math.cos(rad[i][0] + rad[j][0]) arg = max(-1.0, min(1.0, 0.5 * ((1 + q1) * q2 - (1 - q1) * q3))) d = int(R * math.acos(arg) + 1.0) cost[i, j] = cost[j, i] = d return cost def _unpack_explicit(values: list[float], n: int, fmt: str) -> npt.NDArray[np.float64]: cost = np.zeros((n, n), dtype=np.float64) it = iter(values) if fmt == "LOWER_DIAG_ROW": for i in range(n): for j in range(i + 1): cost[i, j] = cost[j, i] = next(it) elif fmt == "LOWER_ROW": for i in range(1, n): for j in range(i): cost[i, j] = cost[j, i] = next(it) elif fmt == "UPPER_ROW": for i in range(n): for j in range(i + 1, n): cost[i, j] = cost[j, i] = next(it) elif fmt == "UPPER_DIAG_ROW": for i in range(n): for j in range(i, n): cost[i, j] = cost[j, i] = next(it) elif fmt == "FULL_MATRIX": for i in range(n): for j in range(n): cost[i, j] = next(it) else: raise ValueError(f"unsupported EDGE_WEIGHT_FORMAT: {fmt}") return cost
[docs] def parse_tsplib_file(path: str | Path) -> RoutingInstance: """Parse a TSPLIB/CVRPLIB format `.vrp` or `.tsp` file. Supports: - ``TYPE: CVRP`` and ``TYPE: TSP`` - ``EDGE_WEIGHT_TYPE``: ``EUC_2D``, ``GEO``, ``EXPLICIT`` - ``EDGE_WEIGHT_FORMAT`` (for EXPLICIT): ``LOWER_DIAG_ROW``, ``UPPER_ROW``, ``FULL_MATRIX`` - ``NODE_COORD_SECTION``, ``EDGE_WEIGHT_SECTION``, ``DEMAND_SECTION``, ``DEPOT_SECTION`` Args: path: Path to the `.vrp` or `.tsp` file. Returns: Parsed :class:`~divi.qprog.problems.RoutingInstance`. Raises: ValueError: If the file format is unsupported or malformed. """ path = Path(path) lines = path.read_text().splitlines() inst = RoutingInstance() section = None coords_list: list[tuple[float, float]] = [] demands_list: list[float] = [] depot_nodes: list[int] = [] edge_weights: list[float] = [] edge_weight_type = "EUC_2D" edge_weight_format: str | None = None for raw_line in lines: line = raw_line.strip() if not line or line.startswith("#"): continue # Header key-value pairs if ":" in line and section is None: key, _, value = line.partition(":") key = key.strip().upper() value = value.strip().strip('"') if key == "NAME": inst.name = value # Try to extract n_vehicles from name like "XSH-n20-k4-01" for part in value.split("-"): if part.startswith("k") and part[1:].isdigit(): inst.n_vehicles = int(part[1:]) elif key == "COMMENT": inst.comment = value # Try to extract optimal cost lower = value.lower() for prefix in [ "optimal cost:", "optimal cost :", "optimal value:", "optimal value :", "optimal:", ]: if prefix in lower: tail = lower.split(prefix, 1)[1] # Accept ints, decimals (with or without leading zero), # and scientific notation. Anchored to the start of # ``tail`` so we don't pick up unrelated numbers. match = re.search( r"-?(?:\d+\.\d+|\.\d+|\d+)(?:[eE][+-]?\d+)?", tail, ) if match: inst.optimal_cost = float(match.group(0)) elif key == "TYPE": inst.problem_type = value.upper() elif key == "DIMENSION": inst.dimension = int(value) elif key == "CAPACITY": inst.capacity = int(value) elif key == "EDGE_WEIGHT_TYPE": edge_weight_type = value.upper() if edge_weight_type not in _SUPPORTED_EWT: raise ValueError( f"Unsupported EDGE_WEIGHT_TYPE: {value}. " f"Supported: {sorted(_SUPPORTED_EWT)}." ) elif key == "EDGE_WEIGHT_FORMAT": edge_weight_format = value.upper() continue # Section headers upper = line.upper() if upper == "NODE_COORD_SECTION": section = "COORDS" continue elif upper == "DEMAND_SECTION": section = "DEMANDS" continue elif upper == "DEPOT_SECTION": section = "DEPOT" continue elif upper == "EDGE_WEIGHT_SECTION": section = "EDGE_WEIGHTS" continue elif upper == "EOF": break elif upper in {"DISPLAY_DATA_SECTION", "FIXED_EDGES_SECTION"}: # Skip auxiliary sections we don't consume. section = "SKIP" continue # Section data if section == "COORDS": parts = line.split() if len(parts) >= 3: coords_list.append((float(parts[1]), float(parts[2]))) elif section == "DEMANDS": parts = line.split() if len(parts) >= 2: demands_list.append(float(parts[1])) elif section == "DEPOT": val = int(line) if val == -1: section = None else: depot_nodes.append(val) elif section == "EDGE_WEIGHTS": edge_weights.extend(float(tok) for tok in line.split()) # Build arrays if coords_list: inst.coords = np.array(coords_list, dtype=np.float64) if demands_list: inst.demands = np.array(demands_list, dtype=np.float64) if depot_nodes: # TSPLIB uses 1-based indexing inst.depot = depot_nodes[0] - 1 # Compute distance matrix if edge_weight_type == "EXPLICIT": if edge_weight_format is None: raise ValueError("EXPLICIT requires EDGE_WEIGHT_FORMAT.") if edge_weight_format not in _SUPPORTED_EXPLICIT_FORMATS: raise ValueError( f"unsupported EDGE_WEIGHT_FORMAT: {edge_weight_format}. " f"Supported: {sorted(_SUPPORTED_EXPLICIT_FORMATS)}." ) inst.cost_matrix = _unpack_explicit( edge_weights, inst.dimension, edge_weight_format ) elif coords_list: if edge_weight_type == "GEO": inst.cost_matrix = _geo_matrix(coords_list) else: # EUC_2D inst.cost_matrix = _euc2d_matrix(coords_list) # Default demands for TSP if inst.problem_type == "TSP" and len(demands_list) == 0: inst.demands = np.zeros(inst.dimension, dtype=np.float64) return inst
[docs] def parse_vrp_solution( path: str | Path, ) -> tuple[list[list[int]], float]: """Parse a CVRPLIB solution file. Expects format:: Route #1: 15 2 18 1 8 Route #2: 10 11 7 9 6 ... Cost 646 Customer IDs are 1-based (TSPLIB convention). The returned routes use 0-based indexing with the depot prepended and appended. Args: path: Path to the `.opt.sol` or `.bst.sol` file. Returns: Tuple of (routes, cost) where routes is a list of routes and each route is ``[depot, c1, c2, ..., depot]`` with 0-based indices. The depot index is assumed to be 0. """ path = Path(path) lines = path.read_text().splitlines() routes: list[list[int]] = [] cost = 0.0 for line in lines: line = line.strip() if line.startswith("Route"): # "Route #1: 15 2 18 1 8" -> customers [15, 2, 18, 1, 8] (1-based) _, _, customer_str = line.partition(":") customers_1based = [int(x) for x in customer_str.split()] # Convert to 0-based route = [0] + [c - 1 for c in customers_1based] + [0] routes.append(route) elif line.startswith("Cost"): cost = float(line.split()[1]) return routes, cost
# --- Routing problem base --- class _RoutingProblemBase(QAOAProblem): """Shared base for routing problems. Stores the :class:`IsingResult` and mixer after subclass constructors call :meth:`_init_ising`. """ def _init_ising( self, qubo, *, block_size: int, n_blocks: int, hamiltonian_builder: Literal["native", "quadratized"] = "native", use_xy_mixer: bool = True, ) -> None: """Shared Ising conversion + mixer setup.""" self._block_size = block_size self._n_blocks = n_blocks self._ising = qubo_to_ising(qubo, hamiltonian_builder=hamiltonian_builder) if use_xy_mixer: graph = build_block_xy_mixer_graph( block_size, n_blocks, range(self._ising.n_qubits) ) self._mixer_hamiltonian = xy_mixer(graph, n_qubits=self._ising.n_qubits) else: self._mixer_hamiltonian = x_mixer(self._ising.n_qubits) @property def cost_hamiltonian(self) -> SparsePauliOp: return self._ising.cost_hamiltonian @property def mixer_hamiltonian(self) -> SparsePauliOp: return self._mixer_hamiltonian @property def loss_constant(self) -> float: return self._ising.loss_constant # --- TSPProblem ---
[docs] class TSPProblem(_RoutingProblemBase): """Traveling Salesman Problem for QAOA. Supports two encodings: * ``"one_hot"`` — assignment-matrix encoding with W-state initialisation and an XY mixer that preserves the one-hot constraint per time slot. Qubit count: ``(n-1)²``. * ``"binary"`` — log-encoded slot bits (CE-QAOA binary) with Hadamard initialisation and a transverse-field mixer. Qubit count: ``(n-1) · ⌈log₂(n)⌉`` before quadratization. Args: cost_matrix: Symmetric distance/cost matrix of shape ``(n, n)``. start_city: Index of the fixed start city. Defaults to 0. encoding: ``"one_hot"`` or ``"binary"``. Defaults to ``"one_hot"``. constraint_penalty: Constraint penalty strength. Defaults to 4.0. objective_weight: Objective weight. Defaults to 1.0. """ def __init__( self, cost_matrix: npt.NDArray[np.floating], *, start_city: int = 0, encoding: Literal["one_hot", "binary"] = "one_hot", constraint_penalty: float = 4.0, objective_weight: float = 1.0, ): self._cost_matrix = np.asarray(cost_matrix, dtype=np.float64) self._start_city = start_city self._n_cities = self._cost_matrix.shape[0] self._encoding = encoding self._binary_config: BinaryBlockConfig | None = None m = self._n_cities - 1 if encoding == "binary": hubo, self._binary_config = create_tsp_hubo_binary( self._cost_matrix, start_city=start_city, constraint_penalty=constraint_penalty, objective_weight=objective_weight, ) self._init_ising( hubo, block_size=self._binary_config.bits_per_slot, n_blocks=self._binary_config.n_slots, hamiltonian_builder="native", use_xy_mixer=False, ) else: qubo = create_tsp_qubo( self._cost_matrix, start_city, constraint_penalty, objective_weight ) self._init_ising(qubo, block_size=m, n_blocks=m) @property def encoding(self) -> str: return self._encoding @property def binary_config(self) -> BinaryBlockConfig | None: return self._binary_config @property def recommended_initial_state(self) -> InitialState: if self._encoding == "binary": return SuperpositionState() return WState(self._block_size, self._n_blocks) @property def decode_fn(self) -> Callable[[str], Any]: n_cities = self._n_cities start = self._start_city if self._encoding == "binary" and self._binary_config is not None: cfg = self._binary_config return lambda bs: decode_binary_tsp_solution(bs, cfg, n_cities, start) return lambda bs: decode_tsp_solution(bs, n_cities, start) @property def feasible_dimension(self) -> int: """Size of the feasible subspace, equal to ``(n-1)!``.""" m = self._n_cities - 1 result = 1 for i in range(1, m + 1): result *= i return result
[docs] def is_feasible(self, bitstring: str) -> bool: if self._encoding == "binary" and self._binary_config is not None: return is_valid_binary_tsp( bitstring, self._binary_config, self._n_cities, self._start_city ) return is_valid_tsp_tour(bitstring, self._n_cities)
[docs] def repair_infeasible_bitstring(self, bitstring: str) -> tuple[str, Any, float]: if self._encoding == "binary": # Binary repair would require its own greedy. Defer until needed. raise NotImplementedError( "repair_infeasible_bitstring is not implemented for binary TSP." ) return repair_tsp_solution( bitstring, self._n_cities, self._start_city, self._cost_matrix )
[docs] def compute_energy(self, bitstring: str) -> float | None: if self._encoding == "binary" and self._binary_config is not None: tour = decode_binary_tsp_solution( bitstring, self._binary_config, self._n_cities, self._start_city ) else: tour = decode_tsp_solution(bitstring, self._n_cities, self._start_city) return tour_cost(tour, self._cost_matrix) if tour is not None else None
# --- CVRPProblem ---
[docs] class CVRPProblem(_RoutingProblemBase): """Capacitated Vehicle Routing Problem for QAOA. Supports two encodings: * ``"one_hot"`` — block one-hot with W-state + XY mixer. * ``"binary"`` — compact binary (HUBO) with Hadamard + X mixer, reducing qubit count from O(N) to O(log N) per slot. Args: cost_matrix: Symmetric distance/cost matrix of shape ``(n, n)``. demands: Customer demands (length = n_nodes, depot demand = 0). capacity: Vehicle capacity. n_vehicles: Number of vehicles. depot: Index of the depot node. Defaults to 0. encoding: ``"one_hot"`` or ``"binary"``. Defaults to ``"one_hot"``. constraint_penalty: Constraint penalty strength. Defaults to 4.0. objective_weight: Objective weight. Defaults to 1.0. capacity_penalty: Capacity penalty strength. Defaults to 4.0. max_steps: Maximum route length per vehicle (``"binary"`` encoding only). ``None`` (default) uses ``n_customers``. Qubit count and HUBO term count both scale with ``n_vehicles * max_steps``; lowering ``max_steps`` below a vehicle's actual stop count renders the instance infeasible. """ def __init__( self, cost_matrix: npt.NDArray[np.floating], *, demands: npt.NDArray[np.floating], capacity: float, n_vehicles: int, depot: int = 0, encoding: Literal["one_hot", "binary"] = "one_hot", constraint_penalty: float = 4.0, objective_weight: float = 1.0, capacity_penalty: float = 4.0, max_steps: int | None = None, ): self._cost_matrix = np.asarray(cost_matrix, dtype=np.float64) self._demands = demands self._capacity = capacity self._n_vehicles = n_vehicles self._depot = depot self._encoding = encoding self._n_cities = self._cost_matrix.shape[0] self._binary_config: BinaryBlockConfig | None = None n_cust = self._n_cities - 1 if encoding == "binary": hubo, self._binary_config = create_cvrp_hubo_binary( self._cost_matrix, demands=demands, capacity=capacity, n_vehicles=n_vehicles, depot=depot, constraint_penalty=constraint_penalty, objective_weight=objective_weight, capacity_penalty=capacity_penalty, max_steps=max_steps, ) self._init_ising( hubo, block_size=self._binary_config.bits_per_slot, n_blocks=self._binary_config.n_slots, hamiltonian_builder="native", use_xy_mixer=False, ) else: if max_steps is not None: raise ValueError("max_steps is only supported for encoding='binary'.") qubo = create_cvrp_qubo( self._cost_matrix, demands=demands, capacity=capacity, n_vehicles=n_vehicles, depot=depot, constraint_penalty=constraint_penalty, objective_weight=objective_weight, capacity_penalty=capacity_penalty, ) bs, nb = cvrp_block_structure(n_cust, n_vehicles) self._init_ising(qubo, block_size=bs, n_blocks=nb) @property def recommended_initial_state(self) -> InitialState: if self._encoding == "binary": return SuperpositionState() return WState(self._block_size, self._n_blocks) @property def decode_fn(self) -> Callable[[str], Any]: if self._encoding == "binary" and self._binary_config is not None: cfg = self._binary_config dep, nn = self._depot, self._n_cities return lambda bs: decode_binary_cvrp_solution(bs, cfg, dep, nn) nc, nv, dep, nn = ( self._n_cities - 1, self._n_vehicles, self._depot, self._n_cities, ) return lambda bs: decode_cvrp_solution(bs, nc, nv, dep, nn) @property def encoding(self) -> str: return self._encoding @property def binary_config(self) -> BinaryBlockConfig | None: return self._binary_config
[docs] def is_feasible(self, bitstring: str) -> bool: if self._encoding == "binary" and self._binary_config is not None: return is_valid_binary_cvrp( bitstring, self._binary_config, self._demands, self._capacity, self._depot, ) return is_valid_cvrp_solution( bitstring, n_customers=self._n_cities - 1, n_vehicles=self._n_vehicles, demands=self._demands, capacity=self._capacity, depot=self._depot, )
[docs] def repair_infeasible_bitstring( self, bitstring: str ) -> tuple[str, Any, float | None]: if self._encoding == "binary": # Binary repair not yet implemented; return unchanged. return bitstring, None, None return repair_cvrp_solution( bitstring, n_customers=self._n_cities - 1, n_vehicles=self._n_vehicles, cost_matrix=self._cost_matrix, demands=self._demands, capacity=self._capacity, depot=self._depot, )
[docs] def compute_energy(self, bitstring: str) -> float | None: if self._encoding == "binary" and self._binary_config is not None: routes = decode_binary_cvrp_solution( bitstring, self._binary_config, self._depot, self._n_cities ) else: routes = decode_cvrp_solution( bitstring, self._n_cities - 1, self._n_vehicles, self._depot, self._n_cities, ) if routes is None: return None return sum(tour_cost(route, self._cost_matrix) for route in routes)