Source code for qlass.utils.utils

from collections.abc import Callable
from typing import Any

import exqalibur
import numpy as np
import perceval as pcvl
import qiskit

H_matrix = (1 / np.sqrt(2)) * pcvl.Matrix([[1.0, 1.0], [1.0, -1.0]])
M_matrix = (1 / np.sqrt(2)) * pcvl.Matrix([[1.0, 1.0], [1.0j, -1.0j]])
mzi = (
    pcvl.BS()
    // (0, pcvl.PS(pcvl.Parameter("phi1")))
    // pcvl.BS()
    // (0, pcvl.PS(pcvl.Parameter("phi2")))
)
H_circ = pcvl.Circuit.decomposition(H_matrix, mzi, shape=pcvl.InterferometerShape.TRIANGLE)
M_circ = pcvl.Circuit.decomposition(M_matrix, mzi, shape=pcvl.InterferometerShape.TRIANGLE)


[docs] def is_qubit_state(state: exqalibur.FockState) -> tuple[int, ...] | bool: """ Check if a given Fock state is a valid qubit state. Args: state (exqalibur.FockState): The Fock state to check Returns: Union[Tuple[int, ...], bool]: The corresponding qubit state if valid, False otherwise """ q_state = [] for i in range(state.m // 2): q = state[2 * i : 2 * i + 2] if q[0] == 0 and q[1] == 1: q_state.append(1) elif q[0] == 1 and q[1] == 0: q_state.append(0) else: return False return tuple(q_state)
[docs] def qubit_state_marginal( prob_dist: dict[exqalibur.FockState | tuple[int, ...], float], ) -> dict[tuple[int, ...], float]: """ Calculate the frequencies of measured qubit states from a probability distribution. This function now handles both Fock states and bitstring inputs: - If input contains Fock states, converts them to qubit states using is_qubit_state - If input already contains bitstrings (tuples), passes them through directly Args: prob_dist (Dict[Union[exqalibur.FockState, Tuple[int, ...]], float]): Probability distribution of either Fock states or bitstrings Returns: Dict[Tuple[int, ...], float]: Frequencies of measured qubit states """ q_state_frequency: dict[tuple[int, ...] | Any, float] = {} total_prob_mass: float = 0 # Check if we're dealing with Fock states or bitstrings if not prob_dist: return q_state_frequency first_key = next(iter(prob_dist.keys())) if isinstance(first_key, tuple): # Input is already bitstrings, normalize probabilities and return total_prob_mass = sum(float(p) for p in prob_dist.values()) for bitstring, prob in prob_dist.items(): q_state_frequency[bitstring] = prob / total_prob_mass else: # Input is Fock states, convert to qubit states for state in prob_dist: q_state = is_qubit_state(state) if q_state is not False: total_prob_mass += float(prob_dist[state]) if q_state in q_state_frequency: q_state_frequency[q_state] += float(prob_dist[state]) else: q_state_frequency[q_state] = float(prob_dist[state]) # Normalize probabilities if total_prob_mass > 0: for key in q_state_frequency: q_state_frequency[key] /= total_prob_mass return q_state_frequency
[docs] def get_probabilities( samples: list[exqalibur.FockState | tuple[int, ...] | str], ) -> dict[exqalibur.FockState | tuple[int, ...], float]: """ Get the probabilities of sampled states. This function now handles: - Fock states: List[exqalibur.FockState] -> Dict[exqalibur.FockState, float] - Bitstring tuples: List[Tuple[int, ...]] -> Dict[Tuple[int, ...], float] - Bitstring strings (from Qiskit): List[str] -> Dict[Tuple[int, ...], float] Args: samples (List[Union[exqalibur.FockState, Tuple[int, ...], str]]): Sampled states (Fock states, bitstring tuples, or bitstring strings) Returns: Dict[Union[exqalibur.FockState, Tuple[int, ...]], float]: Probabilities of sampled states """ if not samples: return {} prob_dist: dict[exqalibur.FockState | tuple[int, ...], float] = {} for state in samples: # Convert string bitstrings (from Qiskit) to tuples if isinstance(state, str): state = tuple(int(bit) for bit in state) if state in prob_dist: prob_dist[state] += 1 else: prob_dist[state] = 1 total_samples = len(samples) for key in prob_dist: prob_dist[key] /= total_samples return prob_dist
[docs] def compute_energy(pauli_bin: tuple[int, ...], res: dict[tuple[int, ...], float]) -> float: """ Compute the expectation value for a given Pauli string and measurement results. Args: pauli_bin (Tuple[int, ...]): A tuple of 0's and 1's (0's are identities, 1's are non-identities (X or Z)) res (Dict[Tuple[int, ...], float]): Frequencies of measured qubit bitstrings Returns: float: The corresponding expectation value """ if not res: return 0.0 # Create a copy to avoid modifying the original dictionary res_copy = res.copy() for key in res_copy: inner = np.dot(key, pauli_bin) sign = (-1) ** inner res_copy[key] *= sign energy = float(np.sum([v for v in res_copy.values() if np.isfinite(v)])) return energy
def pauli_string_bin(pauli_string: str) -> tuple[int, ...]: """ Convert a Pauli string to a binary representation. Args: pauli_string (str): A string representation of Pauli operators (e.g., "IXZI") Returns: Tuple[int, ...]: Binary representation of the Pauli string """ return tuple(0 if c == "I" else 1 for c in pauli_string)
[docs] def rotate_qubits( pauli_string: str, vqe_circuit: pcvl.Circuit | qiskit.QuantumCircuit ) -> pcvl.Circuit: """ Apply the correct rotations on corresponding qubits for expectation value computation. Args: pauli_string (str): A string representation of Pauli operators vqe_circuit (pcvl.Circuit): The VQE circuit to modify Returns: pcvl.Circuit: The modified VQE circuit with applied rotations """ if isinstance(vqe_circuit, qiskit.QuantumCircuit): for i, pauli in enumerate(pauli_string): if pauli == "X": vqe_circuit.h(i) elif pauli == "Y": vqe_circuit.ry(np.pi / 2, i) else: for i, pauli in enumerate(pauli_string): qubit = (2 * i, 2 * i + 1) if pauli == "X": vqe_circuit.add(qubit, H_circ) elif pauli == "Y": vqe_circuit.add(qubit, M_circ) return vqe_circuit
[docs] def normalize_samples(samples: Any) -> list[exqalibur.FockState | tuple[int, ...]]: """ Normalize samples from different executor formats to a consistent format. Handles: - Qiskit bitstring format: ['00', '01', '11'] -> [(0,0), (0,1), (1,1)] - Already normalized tuples: [(0,0), (0,1)] -> [(0,0), (0,1)] - ExQalibur FockStates: [FockState, ...] -> [FockState, ...] Args: samples: Raw samples in various formats Returns: List[Union[exqalibur.FockState, Tuple[int, ...]]]: Normalized samples """ if not samples: return [] normalized = [] for sample in samples: if isinstance(sample, str): # Convert Qiskit bitstring format '00' -> (0,0) normalized.append(tuple(int(bit) for bit in sample)) elif isinstance(sample, (list, tuple)) and all( isinstance(x, (int, np.integer)) for x in sample ): # Convert list/tuple of ints to tuple normalized.append(tuple(sample)) else: # Assume it's already in correct format (e.g., exqalibur.FockState) normalized.append(sample) return normalized
[docs] def loss_function( lp: np.ndarray, H: dict[str, float], executor: Any, mitigator: Any | None = None, ) -> float: """ Compute the loss function for the VQE algorithm with automatic Pauli grouping. This function automatically groups commuting Pauli terms to reduce the number of measurements required. The grouping is applied transparently without changing the function interface, providing automatic optimization for VQE algorithms. The function works with executors that return either: 1. Fock states (from linear optical circuits) - exqalibur.FockState objects 2. Bitstring tuples (from regular qubit-based circuits) 3. Bitstring strings (from Qiskit Sampler) The executor should return samples in one of these formats: - Dict with 'results' key: {'results': [samples]} - Direct list of samples: [samples] - Qiskit-style format with bitstrings or counts Args: lp (np.ndarray): Array of parameter values H (Dict[str, float]): Hamiltonian dictionary executor: A callable function that executes the quantum circuit. mitigator: Optional mitigator object (e.g., M3Mitigator) to correct measurement errors. Returns: float: The computed loss value """ # Import here to avoid circular imports try: from qlass.quantum_chemistry.hamiltonians import group_commuting_pauli_terms use_grouping = True except ImportError: # Fallback to individual processing if grouping not available use_grouping = False loss = 0.0 if use_grouping: # Use automatic grouping for optimized measurements grouped_hamiltonians = group_commuting_pauli_terms(H) for group in grouped_hamiltonians: # Each group contains mutually commuting terms # In the future, this could be optimized to measure entire groups simultaneously # For now, we process each term individually but with the grouping organization for pauli_string, coefficient in group.items(): samples = executor(lp, pauli_string) sample_list = _extract_samples_from_executor_result(samples) normalized_samples = normalize_samples(sample_list) # --- MITIGATION BLOCK --- if mitigator is not None: # Convert samples to counts counts: dict[exqalibur.FockState | tuple[int, ...], int] = {} for s in normalized_samples: counts[s] = counts.get(s, 0) + 1 # Mitigate to get probability distribution directly prob_dist = mitigator.mitigate(counts) else: prob_dist = get_probabilities(normalized_samples) # ------------------------ pauli_bin = pauli_string_bin(pauli_string) qubit_state_marg = qubit_state_marginal(prob_dist) expectation = compute_energy(pauli_bin, qubit_state_marg) loss += coefficient * expectation else: # Fallback to original implementation without grouping for pauli_string, coefficient in H.items(): samples = executor(lp, pauli_string) sample_list = _extract_samples_from_executor_result(samples) normalized_samples = normalize_samples(sample_list) # --- MITIGATION BLOCK --- if mitigator is not None: # Convert samples to counts counts = {} for s in normalized_samples: counts[s] = counts.get(s, 0) + 1 # Mitigate prob_dist = mitigator.mitigate(counts) else: prob_dist = get_probabilities(normalized_samples) # ------------------------ pauli_bin = pauli_string_bin(pauli_string) qubit_state_marg = qubit_state_marginal(prob_dist) expectation = compute_energy(pauli_bin, qubit_state_marg) loss += coefficient * expectation return loss.real
def _extract_samples_from_executor_result(samples: Any) -> list[Any]: """ Helper function to extract sample list from different executor return formats. Args: samples: Raw samples from executor in various formats Returns: List: Extracted sample list Raises: ValueError: If samples format is not recognized """ sample_list: list[Any] | None = None if isinstance(samples, dict): if "results" in samples: sample_list = samples["results"] elif "counts" in samples: # Handle Qiskit counts format: {'00': 500, '11': 500} sample_list = [] for bitstring, count in samples["counts"].items(): sample_list.extend([bitstring] * count) else: # Try to find any list-like values in the dict for _key, value in samples.items(): if isinstance(value, (list, tuple)): sample_list = list(value) break elif isinstance(samples, (list, tuple)): # Direct list of samples sample_list = list(samples) else: raise ValueError( f"Executor returned unexpected format: {type(samples)}. " "Expected dict with 'results' key, dict with 'counts' key, or list of samples." ) if sample_list is None: raise ValueError("Could not extract sample list from executor return value.") return sample_list
[docs] def linear_circuit_to_unitary(circuit: pcvl.Circuit) -> np.ndarray: """ Convert a linear optical circuit to a unitary matrix. Args: circuit (pcvl.Circuit): Linear optical circuit Returns: np.ndarray: Unitary matrix representation of the circuit """ unitary_matrix = np.array(circuit.compute_unitary()) return unitary_matrix
[docs] def compute_expectation_value_from_unitary( unitary: np.ndarray, pauli_matrix: np.ndarray, initial_state: np.ndarray | None = None ) -> float: """ Compute expectation value <ψ|H|ψ> where |ψ> = U|0>. Args: unitary (np.ndarray): Unitary matrix representing the circuit pauli_matrix (np.ndarray): Matrix representation of Pauli operator initial_state (np.ndarray): Initial state vector (default: |0...0>) Returns: float: Expectation value """ n_qubits = int(np.log2(unitary.shape[0])) if initial_state is None: # Default to |0...0> state initial_state = np.zeros(2**n_qubits, dtype=complex) initial_state[0] = 1.0 # Compute |ψ> = U|0> state = unitary @ initial_state # Compute <ψ|H|ψ> expectation = np.real(state.conj() @ pauli_matrix @ state) return float(expectation.real)
[docs] def loss_function_matrix( params: np.ndarray, H: dict[str, float], unitary_executor: Callable ) -> float: """ Compute loss function using unitary matrices directly. Args: params (np.ndarray): Variational parameters H (Dict[str, float]): Hamiltonian dictionary unitary_executor: Function that returns unitary matrix given params Returns: float: Energy expectation value """ from qlass.quantum_chemistry import pauli_string_to_matrix # Get the unitary from the executor unitary = unitary_executor(params) loss = 0.0 for pauli_string, coefficient in H.items(): # Convert Pauli string to matrix pauli_matrix = pauli_string_to_matrix(pauli_string) # Compute expectation value expectation = compute_expectation_value_from_unitary(unitary, pauli_matrix) loss += coefficient * expectation return float(np.real(loss))
[docs] def permanent(matrix: np.ndarray) -> complex: """ Calculate the permanent of a matrix. The permanent is like a determinant but without alternating signs: Perm(A) = Σ_{σ∈Sₘ} Π_{i=1 to m} A_{i,σ(i)} Parameters: ----------- matrix : np.ndarray Square matrix to calculate permanent of Returns: -------- complex The permanent of the matrix """ n = matrix.shape[0] if matrix.shape[0] != matrix.shape[1]: raise ValueError("Permanent is only defined for square matrices.") if n == 0: return 1 # Ryser's formula for the permanent rows = np.arange(n) result = 0 for subset in range(1, 1 << n): S = [(subset >> j) & 1 for j in range(n)] parity = (-1) ** (n - sum(S)) prod = 1 for i in rows: s = 0 for j in range(n): if S[j]: s += matrix[i, j] prod *= s result += parity * prod return result
[docs] def logical_state_to_modes(logical_state: int, m: int) -> list[int]: """ Convert a logical qubit state to the set of occupied photon modes. Parameters: ----------- logical_state : int Integer representing the logical state (0 to 2^m - 1) m : int Number of qubits Returns: -------- List[int] List of occupied mode indices (0-indexed) """ # Convert integer to bit list bits = [(logical_state >> (m - 1 - k)) & 1 for k in range(m)] # For qubit k (0-indexed), the modes are 2k and 2k+1 # |0⟩ₖ → mode 2k, |1⟩ₖ → mode 2k+1 modes = [] for k in range(m): mode = 2 * k + bits[k] modes.append(mode) return modes
[docs] def photon_to_qubit_unitary(U_photon: np.ndarray) -> np.ndarray: """ Convert a photon unitary to the effective qubit unitary via post-selection. Parameters: ----------- U_photon : np.ndarray The 2m × 2m unitary matrix acting on photon modes Returns: -------- np.ndarray The 2^m × 2^m effective qubit unitary matrix """ # Determine number of qubits modes_2m = U_photon.shape[0] if modes_2m % 2 != 0: raise ValueError("Photon unitary must have even dimension (2m × 2m)") m = modes_2m // 2 num_logical_states = 2**m # Initialize the qubit unitary U_qubit = np.zeros((num_logical_states, num_logical_states), dtype=complex) # For each pair of input and output logical states for r in range(num_logical_states): for s in range(num_logical_states): # Get the mode sets for input state |r⟩ and output state |s⟩ R = logical_state_to_modes(r, m) S = logical_state_to_modes(s, m) # Extract the submatrix U(S,R) # Rows indexed by S, columns indexed by R submatrix = U_photon[np.ix_(S, R)] # The matrix element is the permanent of this submatrix U_qubit[s, r] = permanent(submatrix) return U_qubit
[docs] def loss_function_photonic_unitary( params: np.ndarray, H: dict[str, float], photonic_unitary_executor: Callable, initial_state: np.ndarray | None = None, ancillary_modes: list[int] | None = None, ) -> float: """ Computes the loss function for a photonic VQE using the efficient, matrix-free state vector approach for post-selection. This version accepts a full state vector for the initial state, allowing for superposition states, and allows for post-selection on ancillary modes. Args: params: Variational parameters for the ansatz. H: Hamiltonian dictionary. photonic_unitary_executor: A function that takes `params` and returns the M x M photonic unitary `U`, where M is the total number of modes (logical + ancillary). initial_state: The initial qubit state as a 2^m dimensional numpy vector. If None, defaults to the |00...0> state. ancillary_modes: A list of mode indices to be treated as ancillary. Post-selection is performed by assuming these modes are 0 (vacuum) for both input and output. Returns: The computed energy expectation value. """ from qlass.quantum_chemistry import pauli_string_to_matrix # Step 1: Get the physical unitary from the executor U_photon = photonic_unitary_executor(params) total_modes = U_photon.shape[0] # --- Logic for Ancillary Modes --- if ancillary_modes is None: ancillary_modes = [] anc_modes_set = set(ancillary_modes) all_modes_set = set(range(total_modes)) # Validate ancillary modes if not anc_modes_set.issubset(all_modes_set): raise ValueError( f"Ancillary modes {ancillary_modes} contain indices outside " f"the total mode range [0, {total_modes - 1}]" ) # Determine logical modes (those not ancillary) logical_modes_list = sorted(all_modes_set - anc_modes_set) if len(logical_modes_list) % 2 != 0: raise ValueError( f"The number of non-ancillary (logical) modes " f"({len(logical_modes_list)}) must be even for dual-rail encoding." ) # m is the number of logical qubits m = len(logical_modes_list) // 2 dim_logical = 2**m # --- End Ancilla Logic --- # Handle the default initial state if None is provided if initial_state is None: initial_state = np.zeros(dim_logical, dtype=complex) initial_state[0] = 1.0 # Default to |0...0> elif initial_state.shape[0] != dim_logical: # Validate initial state dimension against the *new* logical dimension raise ValueError( f"Initial state dimension ({initial_state.shape[0]}) does not " f"match logical dimension ({dim_logical}) derived from " f"total modes ({total_modes}) minus ancillary modes " f"({len(ancillary_modes)})." ) # Helper function to map logical states to the correct physical modes def _get_physical_modes( logical_state: int, num_qubits: int, logical_mode_map: list[int] ) -> list[int]: """Maps a logical state to physical modes given a specific mode layout.""" # Convert integer to bit list for m qubits bits = [(logical_state >> (num_qubits - 1 - k)) & 1 for k in range(num_qubits)] modes = [] for k in range(num_qubits): # Qubit k uses modes logical_mode_map[2*k] and logical_mode_map[2*k+1] # |0⟩_k -> logical_mode_map[2*k] # |1⟩_k -> logical_mode_map[2*k+1] mode_index = 2 * k + bits[k] modes.append(logical_mode_map[mode_index]) return modes # Step 2: Compute the unnormalized post-selected state vector U'|ψ_in> psi_out_unnormalized = np.zeros(dim_logical, dtype=complex) # Iterate through each basis state |r> in the initial superposition for r_idx, c_r in enumerate(initial_state): if abs(c_r) < 1e-15: continue # Get the physical input modes for logical state |r> # These modes are drawn *only* from the logical_modes_list R_modes = _get_physical_modes(r_idx, m, logical_modes_list) # Calculate the contribution vector v_r = U'|r> for s_idx in range(dim_logical): # Get the physical output modes for logical state |s> S_modes = _get_physical_modes(s_idx, m, logical_modes_list) # The transition amplitude <s|U'|r> is the permanent of the # submatrix U(S, R). This calculation implicitly post-selects # on all other modes (including ancillaries) being vacuum. submatrix = U_photon[np.ix_(S_modes, R_modes)] permanent_val = permanent(submatrix) # Add the weighted contribution to the final state vector psi_out_unnormalized[s_idx] += c_r * permanent_val # Step 3: Calculate success probability success_prob = np.vdot(psi_out_unnormalized, psi_out_unnormalized).real if success_prob < 1e-15: return 1e6 # Return penalty # Step 4: Compute the energy expectation <v|H|v> / <v|v> numerator_energy = 0.0 for pauli_string, coeff in H.items(): if abs(coeff) < 1e-15: continue pauli_matrix = pauli_string_to_matrix(pauli_string) term_expectation = np.vdot(psi_out_unnormalized, pauli_matrix @ psi_out_unnormalized) numerator_energy += coeff * term_expectation final_energy = numerator_energy / success_prob return float(np.real(final_energy))
[docs] def e_vqe_loss_function( lp: np.ndarray, H: dict[str, float], executor: Any, energy_collector: Any, weight_option: str = "weighted", ) -> float: """ Compute the loss function for the ensemble Variational Quantum Eigensolver (VQE) with automatic Pauli grouping for measurement optimization. This function automatically groups commuting Pauli terms to reduce the number of measurements required. The grouping is applied transparently without changing the function interface, providing automatic optimization for ensemble VQE algorithms. The function works with executors that return either: 1. Fock states (from linear optical circuits) - exqalibur.FockState objects 2. Bitstring tuples (from regular qubit-based circuits) 3. Bitstring strings (from Qiskit Sampler) The executor should return samples in one of these formats: - Dict with 'results' key: {'results': [samples]} - Direct list of samples: [samples] - Qiskit-style format with bitstrings or counts Parameters ---------- lp : np.ndarray Array of variational circuit parameters. Typically optimized to minimize the expectation value of the Hamiltonian. H : dict of {str: float} Hamiltonian represented as a dictionary mapping Pauli strings (e.g., ``'X0 Z1'``) to their coefficients. executor : callable Function or callable object that executes the quantum circuit and returns measurement samples. Must accept arguments ``(lp, pauli_string)`` and return samples in one of the accepted formats. energy_collector : object Object responsible for tracking or logging the energy convergence history. Must implement a method ``energies_convergence(energies, n_ensembles, total_loss)``. weight_option : {'weighted', 'equi', 'ground_state_only'} Scheme for assigning ensemble weights: - ``'weighted'`` (default): Linearly decreasing weights with index, i.e., w_i < w_j for i > j. - ``'equi'`` : Equal weights for all occupied orbitals, w_i = w_j. - ``'ground_state_only'`` : Only the ground state contributes, w_0 = 1, others 0. Returns ------- loss : float The computed ensemble VQE loss value, equal to the weighted sum of ensemble energies. """ # Import here to avoid circular imports from qlass.quantum_chemistry.hamiltonians import group_commuting_pauli_terms loss = 0.0 lst_energies: list[float] | None = None # Use automatic grouping for optimized measurements grouped_hamiltonians = group_commuting_pauli_terms(H) for group in grouped_hamiltonians: # Each group contains mutually commuting terms # In the future, this could be optimized to measure entire groups simultaneously # For now, we process each term individually but with the grouping organization for pauli_string, coefficient in group.items(): samples = executor(lp, pauli_string) # Handle different executor return formats sample_lists = [_extract_samples_from_executor_result(s) for s in samples] # Normalize samples to consistent format normalized_samples = [normalize_samples(sample_list) for sample_list in sample_lists] prob_dist = [ get_probabilities(normalized_sample) for normalized_sample in normalized_samples ] pauli_bin = pauli_string_bin(pauli_string) qubit_state_marg = [qubit_state_marginal(pd) for pd in prob_dist] expectation = [compute_energy(pauli_bin, qsm) for qsm in qubit_state_marg] energies = [coefficient * expect for expect in expectation] # Initialize accumulator on first iteration if lst_energies is None: lst_energies = [0.0] * len(energies) # Accumulate energies dynamically for i, energy in enumerate(energies): lst_energies[i] += energy if lst_energies is None: raise ValueError("No energies computed") weights = ensemble_weights(weight_option, len(lst_energies)) for i in range(len(lst_energies)): loss += lst_energies[i] * weights[i] energy_collector.energies_convergence(lst_energies, len(lst_energies), loss) return loss
def ensemble_weights(weights_choice: str, n_occ: int) -> list[float]: """ Generate ensemble weights for the ensemble Variational Quantum Eigensolver (VQE). Ensemble VQE uses weighted contributions from multiple eigenstates when computing the loss function. This function provides a choice of weighting schemes for the occupied orbitals. Parameters ---------- weights_choice : {'weighted', 'equi', 'ground_state_only'} Scheme for assigning ensemble weights: - ``'weighted'`` : Linearly decreasing weights with index, i.e., w_i < w_j for i > j. - ``'equi'`` : Equal weights for all occupied orbitals, w_i = w_j. - ``'ground_state_only'`` : Only the ground state contributes, w_0 = 1, others 0. n_occ : int Number of occupied orbitals in the system. Determines the length of the weight vector. Returns ------- weights : np.ndarray Array of length ``n_occ`` containing the ensemble weights corresponding to the chosen weighting scheme. Notes ----- - See the original article for the derivation of ensemble weights: `arXiv:2509.17982 <https://doi.org/10.48550/arXiv.2509.17982>`_. Examples -------- >>> ensemble_weights("equi", 4) array([0.25, 0.25, 0.25, 0.25]) >>> ensemble_weights("weighted", 4) array([0.375, 0.25 , 0.125, 0. ]) >>> ensemble_weights("ground_state_only", 4) array([1., 0., 0., 0.]) """ if weights_choice == "equi": weights = [1.0 / n_occ for i in range(n_occ)] # should be n_occ of them elif weights_choice == "weighted": weights = [] for i in range(n_occ): weights.append(1 / (n_occ**2) * (2 * n_occ - 1 - 2 * i)) elif weights_choice == "ground_state_only": weights = [1.0] + [0.0] * (n_occ - 1) else: raise ValueError( "Invalid weights_choice. Must be one of 'equi', 'weighted', or 'ground_state_only'." ) return weights class DataCollector: """ Collects and stores energies and loss values during ensemble VQE optimization. This class is intended to track the evolution of the loss function and corresponding energies each time the loss function is evaluated. Unlike the default ``loss_history`` from SciPy's ``minimize``, the number of iterations recorded here may differ, because it logs every call to the loss function, not just accepted steps in the optimizer. Attributes ---------- energy_data : dict of {int: list of float} Dictionary mapping the index of an orbital/eigenstate to a list of its observed energies over successive loss function evaluations. loss_data : list of float List of loss values recorded at each evaluation of the loss function. """ def __init__(self) -> None: """ Initialize the DataCollector with empty data structures. """ self.energy_data: dict[int, list[float]] = {} self.loss_data: list[float] = [] def energies_convergence( self, energy_values: list[float], eign_index: int, loss_values: float ) -> None: """ Record energies and loss values for the current evaluation. Each entry in ``energy_values`` is appended to the corresponding orbital index in ``energy_data``. The total loss for this evaluation is appended to ``loss_data``. Parameters ---------- energy_values : list of float List of energies for each occupied orbital or eigenstate at the current loss function evaluation. eign_index : int Number of energies in ``energy_values`` to record (typically equal to the number of occupied orbitals in the ensemble). loss_values : float Scalar value of the loss function at the current evaluation. """ for i in range(eign_index): if i not in self.energy_data: self.energy_data[i] = [] # initialize a list if key is new self.energy_data[i].append(energy_values[i]) # append instead of replacing self.loss_data.append(loss_values)