Source code for qibolab.platform

"""A platform for executing quantum algorithms."""

from collections import defaultdict
from dataclasses import dataclass, field, replace
from typing import Dict, List, Optional, Tuple

import networkx as nx
from qibo.config import log, raise_error

from .couplers import Coupler
from .execution_parameters import ExecutionParameters
from .instruments.abstract import Controller, Instrument, InstrumentId
from .pulses import Drag, FluxPulse, PulseSequence, ReadoutPulse
from .qubits import Qubit, QubitId, QubitPair, QubitPairId
from .sweeper import Sweeper
from .unrolling import batch

InstrumentMap = Dict[InstrumentId, Instrument]
QubitMap = Dict[QubitId, Qubit]
CouplerMap = Dict[QubitId, Coupler]
QubitPairMap = Dict[QubitPairId, QubitPair]

NS_TO_SEC = 1e-9


[docs]def unroll_sequences( sequences: List[PulseSequence], relaxation_time: int ) -> Tuple[PulseSequence, Dict[str, str]]: """Unrolls a list of pulse sequences to a single pulse sequence with multiple measurements. Args: sequences (list): List of pulse sequences to unroll. relaxation_time (int): Time in ns to wait for the qubit to relax between playing different sequences. Returns: total_sequence (:class:`qibolab.pulses.PulseSequence`): Unrolled pulse sequence containing multiple measurements. readout_map (dict): Map from original readout pulse serials to the unrolled readout pulse serials. Required to construct the results dictionary that is returned after execution. """ total_sequence = PulseSequence() readout_map = defaultdict(list) start = 0 for sequence in sequences: for pulse in sequence: new_pulse = pulse.copy() new_pulse.start += start total_sequence.add(new_pulse) if isinstance(pulse, ReadoutPulse): readout_map[pulse.serial].append(new_pulse.serial) start = total_sequence.finish + relaxation_time return total_sequence, readout_map
[docs]@dataclass class Settings: """Default execution settings read from the runcard.""" nshots: int = 1024 """Default number of repetitions when executing a pulse sequence.""" relaxation_time: int = int(1e5) """Time in ns to wait for the qubit to relax to its ground state between shots."""
[docs] def fill(self, options: ExecutionParameters): """Use default values for missing execution options.""" if options.nshots is None: options = replace(options, nshots=self.nshots) if options.relaxation_time is None: options = replace(options, relaxation_time=self.relaxation_time) return options
[docs]@dataclass class Platform: """Platform for controlling quantum devices.""" name: str """Name of the platform.""" qubits: QubitMap """Dictionary mapping qubit names to :class:`qibolab.qubits.Qubit` objects.""" pairs: QubitPairMap """Dictionary mapping tuples of qubit names to :class:`qibolab.qubits.QubitPair` objects.""" instruments: InstrumentMap """Dictionary mapping instrument names to :class:`qibolab.instruments.abstract.Instrument` objects.""" settings: Settings = field(default_factory=Settings) """Container with default execution settings.""" resonator_type: Optional[str] = None """Type of resonator (2D or 3D) in the used QPU. Default is 3D for single-qubit chips and 2D for multi-qubit. """ couplers: CouplerMap = field(default_factory=dict) """Dictionary mapping coupler names to :class:`qibolab.couplers.Coupler` objects.""" is_connected: bool = False """Flag for whether we are connected to the physical instruments.""" topology: nx.Graph = field(default_factory=nx.Graph) """Graph representing the qubit connectivity in the quantum chip.""" def __post_init__(self): log.info("Loading platform %s", self.name) if self.resonator_type is None: self.resonator_type = "3D" if self.nqubits == 1 else "2D" self.topology.add_nodes_from(self.qubits.keys()) self.topology.add_edges_from( [(pair.qubit1.name, pair.qubit2.name) for pair in self.pairs.values()] ) def __str__(self): return self.name @property def nqubits(self) -> int: """Total number of usable qubits in the QPU.""" return len(self.qubits) @property def ordered_pairs(self): """List of qubit pairs that are connected in the QPU.""" return sorted({tuple(sorted(pair)) for pair in self.pairs}) @property def sampling_rate(self): """Sampling rate of control electronics in giga samples per second (GSps).""" for instrument in self.instruments.values(): if isinstance(instrument, Controller): return instrument.sampling_rate
[docs] def connect(self): """Connect to all instruments.""" if not self.is_connected: for instrument in self.instruments.values(): try: log.info(f"Connecting to instrument {instrument}.") instrument.connect() except Exception as exception: raise_error( RuntimeError, f"Cannot establish connection to {instrument} instruments. Error captured: '{exception}'", ) self.is_connected = True
[docs] def disconnect(self): """Disconnects from instruments.""" if self.is_connected: for instrument in self.instruments.values(): instrument.disconnect() self.is_connected = False
def _execute(self, sequence, options, **kwargs): """Executes sequence on the controllers.""" result = {} for instrument in self.instruments.values(): if isinstance(instrument, Controller): new_result = instrument.play( self.qubits, self.couplers, sequence, options ) if isinstance(new_result, dict): result.update(new_result) return result
[docs] def execute_pulse_sequence( self, sequence: PulseSequence, options: ExecutionParameters, **kwargs ): """ Args: sequence (:class:`qibolab.pulses.PulseSequence`): Pulse sequences to execute. options (:class:`qibolab.platforms.platform.ExecutionParameters`): Object holding the execution options. **kwargs: May need them for something Returns: Readout results acquired by after execution. """ options = self.settings.fill(options) time = ( (sequence.duration + options.relaxation_time) * options.nshots * NS_TO_SEC ) log.info(f"Minimal execution time (sequence): {time}") return self._execute(sequence, options, **kwargs)
@property def _controller(self): """Controller instrument used for splitting the unrolled sequences to batches. Used only by :meth:`qibolab.platform.Platform.execute_pulse_sequences` (unrolling). This method does not support platforms with more than one controller instruments. """ controllers = [ instr for instr in self.instruments.values() if isinstance(instr, Controller) ] assert len(controllers) == 1 return controllers[0]
[docs] def execute_pulse_sequences( self, sequences: List[PulseSequence], options: ExecutionParameters, **kwargs ): """ Args: sequence (List[:class:`qibolab.pulses.PulseSequence`]): Pulse sequences to execute. options (:class:`qibolab.platforms.platform.ExecutionParameters`): Object holding the execution options. **kwargs: May need them for something Returns: Readout results acquired by after execution. """ options = self.settings.fill(options) duration = sum(seq.duration for seq in sequences) time = ( (duration + len(sequences) * options.relaxation_time) * options.nshots * NS_TO_SEC ) log.info(f"Minimal execution time (unrolling): {time}") # find readout pulses ro_pulses = { pulse.serial: pulse.qubit for sequence in sequences for pulse in sequence.ro_pulses } results = defaultdict(list) bounds = kwargs.get("bounds", self._controller.bounds) for b in batch(sequences, bounds): sequence, readouts = unroll_sequences(b, options.relaxation_time) result = self._execute(sequence, options, **kwargs) for serial, new_serials in readouts.items(): results[serial].extend(result[ser] for ser in new_serials) for serial, qubit in ro_pulses.items(): results[qubit] = results[serial] return results
[docs] def sweep( self, sequence: PulseSequence, options: ExecutionParameters, *sweepers: Sweeper ): """Executes a pulse sequence for different values of sweeped parameters. Useful for performing chip characterization. Example: .. testcode:: import numpy as np from qibolab.dummy import create_dummy from qibolab.sweeper import Sweeper, Parameter from qibolab.pulses import PulseSequence from qibolab.execution_parameters import ExecutionParameters platform = create_dummy() sequence = PulseSequence() parameter = Parameter.frequency pulse = platform.create_qubit_readout_pulse(qubit=0, start=0) sequence.add(pulse) parameter_range = np.random.randint(10, size=10) sweeper = Sweeper(parameter, parameter_range, [pulse]) platform.sweep(sequence, ExecutionParameters(), sweeper) Returns: Readout results acquired by after execution. """ if options.nshots is None: options = replace(options, nshots=self.settings.nshots) if options.relaxation_time is None: options = replace(options, relaxation_time=self.settings.relaxation_time) time = ( (sequence.duration + options.relaxation_time) * options.nshots * NS_TO_SEC ) for sweep in sweepers: time *= len(sweep.values) log.info(f"Minimal execution time (sweep): {time}") result = {} for instrument in self.instruments.values(): if isinstance(instrument, Controller): new_result = instrument.sweep( self.qubits, self.couplers, sequence, options, *sweepers ) if isinstance(new_result, dict): result.update(new_result) return result
def __call__(self, sequence, options): return self.execute_pulse_sequence(sequence, options)
[docs] def get_qubit(self, qubit): """Return the name of the physical qubit corresponding to a logical qubit. Temporary fix for the compiler to work for platforms where the qubits are not named as 0, 1, 2, ... """ try: return self.qubits[qubit].name except KeyError: return list(self.qubits.keys())[qubit]
[docs] def get_coupler(self, coupler): """Return the name of the physical coupler corresponding to a logical coupler. Temporary fix for the compiler to work for platforms where the couplers are not named as 0, 1, 2, ... """ try: return self.couplers[coupler].name except KeyError: return list(self.couplers.keys())[coupler]
[docs] def create_RX90_pulse(self, qubit, start=0, relative_phase=0): qubit = self.get_qubit(qubit) return self.qubits[qubit].native_gates.RX90.pulse(start, relative_phase)
[docs] def create_RX_pulse(self, qubit, start=0, relative_phase=0): qubit = self.get_qubit(qubit) return self.qubits[qubit].native_gates.RX.pulse(start, relative_phase)
[docs] def create_RX12_pulse(self, qubit, start=0, relative_phase=0): qubit = self.get_qubit(qubit) return self.qubits[qubit].native_gates.RX12.pulse(start, relative_phase)
[docs] def create_CZ_pulse_sequence(self, qubits, start=0): pair = tuple(self.get_qubit(q) for q in qubits) if pair not in self.pairs or self.pairs[pair].native_gates.CZ is None: raise_error( ValueError, f"Calibration for CZ gate between qubits {qubits[0]} and {qubits[1]} not found.", ) return self.pairs[pair].native_gates.CZ.sequence(start)
[docs] def create_iSWAP_pulse_sequence(self, qubits, start=0): pair = tuple(self.get_qubit(q) for q in qubits) if pair not in self.pairs or self.pairs[pair].native_gates.iSWAP is None: raise_error( ValueError, f"Calibration for iSWAP gate between qubits {qubits[0]} and {qubits[1]} not found.", ) return self.pairs[pair].native_gates.iSWAP.sequence(start)
[docs] def create_CNOT_pulse_sequence(self, qubits, start=0): pair = tuple(self.get_qubit(q) for q in qubits) if pair not in self.pairs or self.pairs[pair].native_gates.CNOT is None: raise_error( ValueError, f"Calibration for CNOT gate between qubits {qubits[0]} and {qubits[1]} not found.", ) return self.pairs[pair].native_gates.CNOT.sequence(start)
[docs] def create_MZ_pulse(self, qubit, start): qubit = self.get_qubit(qubit) return self.qubits[qubit].native_gates.MZ.pulse(start)
[docs] def create_qubit_drive_pulse(self, qubit, start, duration, relative_phase=0): qubit = self.get_qubit(qubit) pulse = self.qubits[qubit].native_gates.RX.pulse(start, relative_phase) pulse.duration = duration return pulse
[docs] def create_qubit_readout_pulse(self, qubit, start): qubit = self.get_qubit(qubit) return self.create_MZ_pulse(qubit, start)
[docs] def create_qubit_flux_pulse(self, qubit, start, duration, amplitude=1): qubit = self.get_qubit(qubit) pulse = FluxPulse( start=start, duration=duration, amplitude=amplitude, shape="Rectangular", channel=self.qubits[qubit].flux.name, qubit=qubit, ) pulse.duration = duration return pulse
[docs] def create_coupler_pulse(self, coupler, start, duration=None, amplitude=None): coupler = self.get_coupler(coupler) pulse = self.couplers[coupler].native_pulse.CP.pulse(start) if duration is not None: pulse.duration = duration if amplitude is not None: pulse.amplitude = amplitude return pulse
# TODO Remove RX90_drag_pulse and RX_drag_pulse, replace them with create_qubit_drive_pulse # TODO Add RY90 and RY pulses
[docs] def create_RX90_drag_pulse(self, qubit, start, beta, relative_phase=0): """Create native RX90 pulse with Drag shape.""" qubit = self.get_qubit(qubit) pulse = self.qubits[qubit].native_gates.RX90.pulse(start, relative_phase) pulse.shape = Drag(rel_sigma=pulse.shape.rel_sigma, beta=beta) pulse.shape.pulse = pulse return pulse
[docs] def create_RX_drag_pulse(self, qubit, start, beta, relative_phase=0): """Create native RX pulse with Drag shape.""" qubit = self.get_qubit(qubit) pulse = self.qubits[qubit].native_gates.RX.pulse(start, relative_phase) pulse.shape = Drag(rel_sigma=pulse.shape.rel_sigma, beta=beta) pulse.shape.pulse = pulse return pulse