Source code for qiboml.models.decoding

from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Dict, Optional, Union

from qibo import Circuit, gates, get_transpiler, transpiler
from qibo.backends import Backend, NumpyBackend, _check_backend
from qibo.config import log, raise_error
from qibo.hamiltonians import Hamiltonian, SymbolicHamiltonian, Z
from qibo.models.error_mitigation import error_sensitive_circuit
from qibo.noise import NoiseModel
from qibo.quantum_info.metrics import infidelity
from qibo.result import CircuitResult, MeasurementOutcomes, QuantumState
from qibo.transpiler import Passes

from qiboml import ndarray
from qiboml.models.calibrator import Calibrator
from qiboml.models.utils import Mitigator


[docs]@dataclass class QuantumDecoding: """ Abstract decoder class. Args: nqubits (int): total number of qubits. qubits (tuple[int], optional): set of qubits it acts on, by default ``range(nqubits)``. wire_names (tuple[int] | tuple[str], optional): names to be given to the wires, this has to have ``len`` equal to ``nqubits``. Additionally, this is mostly useful when executing on hardware to select which qubits to make use of. Namely, if the chip has qubits named: ``` ("a", "b", "c", "d") ``` and we wish to deploy a two qubits circuit on the first and last qubits you have to build it as: ``` decoding = QuantumDecoding(nqubits=2, wire_names=("a", "d")) ``` nshots (int, optional): number of shots used for circuit execution and sampling. backend (Backend, optional): backend used for computation, by default the globally-set backend is used. transpiler (Passes, optional): transpiler to run before circuit execution, by default no transpilation is performed on the circuit (``transpiler=None``). noise_model (NoiseModel): a ``NoiseModel`` of Qibo, which is applied to the given circuit to perform noisy simulations. In case a `transpiler` is passed, the noise model is applied to the transpiled circuit. Default is ``None`` and no noise is added. density_matrix (bool): if ``True``, density matrix simulation is performed instead of state-vector simulation. """ nqubits: int qubits: Optional[tuple[int]] = None wire_names: Optional[Union[tuple[int], Union[tuple[str]]]] = None nshots: Optional[int] = None backend: Optional[Backend] = None transpiler: Optional[Passes] = None noise_model: Optional[NoiseModel] = None density_matrix: Optional[bool] = False _circuit: Circuit = None def __post_init__(self): """Ancillary post initialization operations.""" if self.qubits is None: self.qubits = tuple(range(self.nqubits)) else: self.qubits = tuple(self.qubits) if self.wire_names is not None: # self.wire_names has to be a tuple to make the decoder hashable # and thus usable in Jax differentiation self.wire_names = tuple(self.wire_names) # I have to convert to list because qibo does not accept a tuple wire_names = list(self.wire_names) if self.wire_names is not None else None self._circuit = Circuit( self.nqubits, wire_names=wire_names, density_matrix=self.density_matrix ) self.backend = _check_backend(self.backend) self._circuit.add(gates.M(*self.qubits))
[docs] def __call__( self, x: Circuit ) -> Union[CircuitResult, QuantumState, MeasurementOutcomes]: """Combine the input circuir with the internal one and execute them with the internal backend. Args: x (Circuit): input circuit. Returns: (CircuitResult | QuantumState | MeasurementOutcomes): the execution ``qibo.result`` object. """ x = self.preprocessing(x) return self.backend.execute_circuit(x + self._circuit, nshots=self.nshots)
[docs] def preprocessing(self, x: Circuit) -> Circuit: """Perform some preprocessing on the input circuit to run with the settings specified by the decoder. In detail, transpilation and noise application on the input circuit is performed.""" self.align_circuits(x) x = self.transpile(x) x = self.apply_noise(x) return x
[docs] def align_circuits(self, x: Circuit): """Align some attributes of the input circuit with the internal one, e.g. sets the density_matrix and wire_names.""" # Standardize the density matrix attribute self._align_density_matrix(x) self._align_wire_names(x)
[docs] def transpile(self, x: Circuit) -> Circuit: """Transpile a given circuit ``x`` using the instructions provided by the ``transpiler`` attribute.""" if self.transpiler is not None: x, _ = self.transpiler(x) return x
[docs] def apply_noise(self, x: Circuit) -> Circuit: """Apply the decoder ``noise_model`` to the target circuit.""" if self.noise_model is not None: x = self.noise_model.apply(x) return x
@property def circuit( self, ) -> Circuit: """A copy of the internal circuit. Returns: (Circuit): a copy of the internal circuit. """ return self._circuit.copy()
[docs] def set_backend(self, backend: Backend): """Set the internal backend. Args: backend (Backend): backend to be set. """ self.backend = backend
@property def output_shape(self): """The shape of the decoded outputs.""" raise_error(NotImplementedError) @property def analytic(self) -> bool: """Whether the decoder is analytic, i.e. the gradient is ananlytically computable, or not (e.g. if sampling is involved). Returns: (bool): ``True`` if ``nshots`` is ``None``, ``False`` otherwise. """ if self.nshots is None: return True return False
[docs] def _align_density_matrix(self, x: Circuit): """Share the density matrix attribute with the input circuit.""" # Forcing the density matrix simulation if a noise model is given if self.noise_model is not None: density_matrix = True else: density_matrix = self.density_matrix # Aligning the density_matrix attribute of all the circuits self._circuit.init_kwargs["density_matrix"] = density_matrix x.init_kwargs["density_matrix"] = density_matrix
[docs] def _align_wire_names(self, x: Circuit): """Share the wire names with the input circuit.""" wire_names = list(self.wire_names) if self.wire_names is not None else None x.wire_names = wire_names x.init_kwargs["wire_names"] = wire_names
[docs] @contextmanager def _temporary_nshots(self, nshots): """Context manager to execute the decoder with a custom number of shots.""" original = self.nshots self.nshots = nshots try: yield finally: self.nshots = original
def __hash__(self) -> int: return hash((self.qubits, self.wire_names, self.nshots, self.backend))
[docs]class Probabilities(QuantumDecoding): """The probabilities decoder.""" # TODO: collapse on ExpectationDecoding if not analytic
[docs] def __call__(self, x: Circuit) -> ndarray: """Computes the final state probabilities. Args: x (Circuit): input circuit. Returns: (ndarray): the final probabilities. """ return self.backend.np.reshape( super().__call__(x).probabilities(self.qubits), self.output_shape )
@property def output_shape(self) -> tuple[int, int]: """Shape of the output probabilities. Returns: (tuple[int, int]): a ``(1, 2**nqubits)`` shape. """ n = 2 ** len(self.qubits) return (1, n) @property def analytic(self) -> bool: return True
[docs]@dataclass class Expectation(QuantumDecoding): r"""The expectation value decoder. Args: observable (Hamiltonian | ndarray): the observable to calculate the expectation value of, by default :math:`Z_0 + Z_1 + ... + Z_n` is used. mitigation_config (dict): configuration of the real-time quantum error mitigation method in case it is desired. The real-time quantum error mitigation algorithm is proposed in https://arxiv.org/abs/2311.05680 and consists in performing a real-time check of the reliability of a learned mitigation map. This is done by constructing a reference error-sensitive Clifford circuit, which preserves the size of the original, target one. When the decoder is called, the reliability of the mitigation map is checked by computing a simple metric :math:`D = |E_{\rm noisy} - E_{\rm mitigated}|`. If the metric is found exceeding an arbitrary threshold value :math:`\delta`, then a chosen data-driven error mitigation technique is executed to retrieve the mitigation map. To successfully check the reliability of the mitigation map or computing the map itself, it is recommended to use a number of shots which leads to a statistical noise (due to measurements) :math:`\varepsilon << \delta`. For this reason, the real-time error mitigation algorithm can be customized by passing also a `min_iterations` argument, which will define the minimum number of decoding calls which have to happen before the mitigation map check is performed. An example of real-time error mitigation configuration is: .. code-block:: python mitigation_config = { "threshold": 2e-1, "min_iterations": 500, "method": "CDR", "method_kwargs": {"n_training_samples": 100, "nshots": 10000}, } The given example is performing real-time error mitigation with the request of computing the mitigation map via Clifford Data Regression whenever the reference expectation value differs from the mitigated one of :math:`\delta > 0.2`. This check is performed every 500 iterations and, in case it is required, the mitigation map is computed executing circuits with `nshots=10000`. """ observable: Union[ndarray, Hamiltonian] = None mitigation_config: Optional[Dict[str, Any]] = None calibrator: Optional[Calibrator] = None def __post_init__(self): """Ancillary post initialization operations.""" super().__post_init__() if self.observable is None: self.observable = Z(len(self.qubits), dense=False, backend=self.backend) # If mitigation is requested if self.mitigation_config is not None: # Construct the Mitigator object self.mitigator = Mitigator( mitigation_config=self.mitigation_config, backend=self.backend, )
[docs] def __call__(self, x: Circuit) -> ndarray: """ Execute the input circuit and calculate the expectation value of the internal observable on the final state. Args: x (Circuit): input Circuit. Returns: (ndarray): the calculated expectation value. """ if self.mitigation_config is not None: # In this case it is required before the super.call self.align_circuits(x) x = self.transpile(x) _real_time_mitigation_check(self, x) # run circuit if self.analytic: expval = self.observable.expectation(super().__call__(x).state()) else: if isinstance(self.observable, SymbolicHamiltonian): x = self.preprocessing(x) expval = self.observable.expectation_from_circuit( x, nshots=self.nshots, ) else: expval = self.observable.expectation_from_samples( super().__call__(x).frequencies(), qubit_map=self.qubits, ) # apply mitigation if requested if self.mitigation_config is not None: expval = self.backend.cast( self.mitigator(expval), dtype=self.backend.np.float64, ) if self.calibrator is not None: self.calibrator() return self.backend.np.reshape(expval, (1, 1))
@property def output_shape(self) -> tuple[int, int]: """Shape of the output expectation value. Returns: (tuple[int, int]): a ``(1, 1)`` shape. """ return (1, 1)
[docs] def set_backend(self, backend: Backend): """Set the internal and observable's backends. Args: backend (Backend): backend to be set. """ if isinstance(self.observable, Hamiltonian): matrix = self.backend.to_numpy(self.observable.matrix) super().set_backend(backend) self.observable = Hamiltonian( nqubits=self.nqubits, matrix=self.backend.cast(matrix), backend=self.backend, ) else: super().set_backend(backend) self.observable.backend = backend
def __hash__(self) -> int: return hash((self.qubits, self.nshots, self.backend, self.observable))
[docs]class State(QuantumDecoding): """The state decoder."""
[docs] def __call__(self, x: Circuit) -> ndarray: """Compute the final state of the input circuit and separates it in its real and imaginary parts stacked on top of each other. Args: x (Circuit): input Circuit. Returns: (ndarray): the final state. """ state = super().__call__(x).state() return self.backend.np.vstack( # pylint: disable=no-member ( self.backend.np.real(state), # pylint: disable=no-member self.backend.np.imag(state), # pylint: disable=no-member ) ).reshape(self.output_shape)
@property def output_shape(self) -> tuple[int, int, int]: """Shape of the output state. Returns: (tuple[int, int, int]): a ``(2, 1, 2**nqubits)`` shape. """ n = 2 ** len(self.qubits) if self.density_matrix: return (2, n, n) return (2, 1, n) @property def analytic(self) -> bool: return True
[docs]class Samples(QuantumDecoding): """The samples decoder.""" def __post_init__(self): super().__post_init__()
[docs] def __call__(self, x: Circuit) -> ndarray: """Sample the final state of the circuit. Args: x (Circuit): input Circuit. Returns: ndarray: Generated samples. """ return self.backend.cast(super().__call__(x).samples(), self.backend.np.float64)
@property def output_shape(self) -> tuple[int, int]: """Shape of the output samples. Returns: (tuple[int, int]): a ``(nshots, nqubits)`` shape. """ return (self.nshots, len(self.qubits)) @property def analytic(self) -> bool: # pragma: no cover return False
[docs]@dataclass(kw_only=True) class VariationalQuantumLinearSolver(QuantumDecoding): """Decoder for the Variational Quantum Linear Solver (VQLS). Args: target_state (ndarray): Target solution vector :math:`\\ket{b}`. A (ndarray): The matrix ``A`` in the linear system :math:`A \\, \\ket{x} = \\ket{b}`. Reference: C. Bravo-Prieto, R. LaRose, M. Cerezo, Y. Subasi, L. Cincio, and P. J. Coles, *Variational quantum linear solver*, `Quantum 7, 1188 (2023) <https://doi.org/10.22331/q-2023-11-22-1188>`_. """ target_state: ndarray A: ndarray def __post_init__(self): super().__post_init__() self.target_state = self.backend.cast( self.target_state, dtype=self.backend.np.complex128 ) self.A = self.backend.cast(self.A, dtype=self.backend.np.complex128)
[docs] def __call__(self, circuit: Circuit): result = super().__call__(circuit) state = result.state() final_state = self.A @ state normalized = final_state / self.backend.calculate_vector_norm(final_state) cost = infidelity(normalized, self.target_state, backend=self.backend) return self.backend.cast( self.backend.np.real(cost), dtype=self.backend.np.float64 )
@property def output_shape(self) -> tuple[int, int]: return (1, 1) @property def analytic(self) -> bool: return True
[docs]def _real_time_mitigation_check(decoder: Expectation, x: Circuit): """ Helper function to execute the real time mitigation check and, if necessary, to compute the reference circuit expectation value. """ # At first iteration, compute the reference value (exact) if decoder.mitigator._reference_value is None: decoder.mitigator.calculate_reference_expval( observable=decoder.observable, circuit=x, ) # Trigger the mechanism at first iteration _check_or_recompute_map(decoder, x) if decoder.mitigator._iteration_counter == decoder.mitigator._min_iterations: log.info("Checking map since max iterations reached.") _check_or_recompute_map(decoder, x) decoder.mitigator._iteration_counter = 0 else: decoder.mitigator._iteration_counter += 1
[docs]def _check_or_recompute_map(decoder: Expectation, x: Circuit): """Helper function to recompute the mitigation map.""" # Compute the expectation value of the reference circuit with decoder._temporary_nshots(decoder.mitigator._nshots): freqs = ( super(Expectation, decoder) .__call__(decoder.mitigator._reference_circuit) .frequencies() ) reference_expval = decoder.observable.expectation_from_samples( freqs, qubit_map=decoder.qubits ) # Check or update noise map decoder.mitigator.check_or_update_map( noisy_reference_value=reference_expval, circuit=x + decoder._circuit, observable=decoder.observable, noise_model=decoder.noise_model, )