Source code for qibolab.instruments.zhinst.executor

"""Executing pulse sequences on a Zurich Instruments devices."""

import re
from collections import defaultdict
from dataclasses import dataclass, replace
from typing import Any, Optional

import laboneq.simple as lo
import numpy as np
from qibo.config import log

from qibolab import AcquisitionType, AveragingMode, ExecutionParameters
from qibolab.couplers import Coupler
from qibolab.instruments.abstract import Controller
from qibolab.instruments.port import Port
from qibolab.pulses import FluxPulse, PulseSequence, PulseType
from qibolab.qubits import Qubit
from qibolab.sweeper import Parameter, Sweeper
from qibolab.unrolling import Bounds

from .pulse import ZhPulse
from .sweep import ProcessedSweeps, classify_sweepers
from .util import (
    NANO_TO_SECONDS,
    SAMPLING_RATE,
    acquire_channel_name,
    measure_channel_name,
)

COMPILER_SETTINGS = {
    "SHFSG_MIN_PLAYWAVE_HINT": 32,
    "SHFSG_MIN_PLAYZERO_HINT": 32,
    "HDAWG_MIN_PLAYWAVE_HINT": 64,
    "HDAWG_MIN_PLAYZERO_HINT": 64,
}
"""Translating to Zurich ExecutionParameters."""
ACQUISITION_TYPE = {
    AcquisitionType.INTEGRATION: lo.AcquisitionType.INTEGRATION,
    AcquisitionType.RAW: lo.AcquisitionType.RAW,
    AcquisitionType.DISCRIMINATION: lo.AcquisitionType.DISCRIMINATION,
}

AVERAGING_MODE = {
    AveragingMode.CYCLIC: lo.AveragingMode.CYCLIC,
    AveragingMode.SINGLESHOT: lo.AveragingMode.SINGLE_SHOT,
}


[docs]@dataclass class ZhPort(Port): name: tuple[str, str] offset: float = 0.0 power_range: int = 0
[docs]@dataclass class SubSequence: """A subsequence is a slice (in time) of a sequence that contains at most one measurement per qubit. When the driver is asked to execute a sequence, it will first split it into sub-sequences. This is needed so that we can create a separate laboneq section for each measurement (multiple measurements per section are not allowed). When splitting a sequence, it is assumed that 1. a measurement operation can be parallel (in time) to another measurement operation (i.e. measuring multiple qubits simultaneously), but other channels (e.g. drive) do not contain any pulses parallel to measurements, 2. ith measurement on some channel is in the same subsequence as the ith measurement (if any) on another measurement channel, 3. all measurements in one subsequence happen at the same time. """ measurements: list[tuple[str, ZhPulse]] control_sequence: dict[str, list[ZhPulse]]
[docs]class Zurich(Controller): """Driver for a collection of ZI instruments that are automatically synchronized via ZSync protocol.""" PortType = ZhPort def __init__(self, name, device_setup, time_of_flight=0.0, smearing=0.0): super().__init__(name, None) self.signal_map = {} "Signals to lines mapping" self.calibration = lo.Calibration() "Zurich calibration object)" self.device_setup = device_setup self.session = None "Zurich device parameters for connection" self.time_of_flight = time_of_flight self.smearing = smearing "Parameters read from the runcard not part of ExecutionParameters" self.experiment = None self.results = None "Zurich experiment definitions" self.bounds = Bounds( waveforms=int(4e4), readout=250, instructions=int(1e6), ) self.acquisition_type = None "To store if the AcquisitionType.SPECTROSCOPY needs to be enabled by parsing the sequence" self.sequence = defaultdict(list) "Zurich pulse sequence" self.sub_sequences: list[SubSequence] = [] "Sub sequences between each measurement" self.unsplit_channels: set[str] = set() "Names of channels that were not split into sub-sequences" self.processed_sweeps: Optional[ProcessedSweeps] = None self.nt_sweeps: list[Sweeper] = [] self.rt_sweeps: list[Sweeper] = [] @property def sampling_rate(self): return SAMPLING_RATE
[docs] def connect(self): if self.is_connected is False: # To fully remove logging #configure_logging=False # I strongly advise to set it to 20 to have time estimates of the experiment duration! self.session = lo.Session(self.device_setup, log_level=20) _ = self.session.connect() self.is_connected = True
[docs] def disconnect(self): if self.is_connected: _ = self.session.disconnect() self.is_connected = False
[docs] def calibration_step(self, qubits, couplers, options): """Zurich general pre experiment calibration definitions. Change to get frequencies from sequence """ for coupler in couplers.values(): self.register_couplerflux_line(coupler) for qubit in qubits.values(): if qubit.flux is not None: self.register_flux_line(qubit) if len(self.sequence[qubit.drive.name]) != 0: self.register_drive_line( qubit=qubit, intermediate_frequency=qubit.drive_frequency - qubit.drive.local_oscillator.frequency, ) if len(self.sequence[measure_channel_name(qubit)]) != 0: self.register_readout_line( qubit=qubit, intermediate_frequency=qubit.readout_frequency - qubit.readout.local_oscillator.frequency, options=options, ) self.device_setup.set_calibration(self.calibration)
[docs] def register_readout_line(self, qubit, intermediate_frequency, options): """Registers qubit measure and acquire lines to calibration and signal map. Note ---- To allow debugging with and oscilloscope, just set the following:: self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = lo.SignalCalibration( ..., local_oscillator=lo.Oscillator( ... frequency=0.0, ), ..., port_mode=lo.PortMode.LF, ..., ) """ q = qubit.name # pylint: disable=C0103 self.signal_map[measure_channel_name(qubit)] = ( self.device_setup.logical_signal_groups[f"q{q}"].logical_signals[ "measure_line" ] ) self.calibration[f"/logical_signal_groups/q{q}/measure_line"] = ( lo.SignalCalibration( oscillator=lo.Oscillator( frequency=intermediate_frequency, modulation_type=lo.ModulationType.SOFTWARE, ), local_oscillator=lo.Oscillator( frequency=int(qubit.readout.local_oscillator.frequency), ), range=qubit.readout.power_range, port_delay=None, delay_signal=0, ) ) self.signal_map[acquire_channel_name(qubit)] = ( self.device_setup.logical_signal_groups[f"q{q}"].logical_signals[ "acquire_line" ] ) oscillator = lo.Oscillator( frequency=intermediate_frequency, modulation_type=lo.ModulationType.SOFTWARE, ) threshold = None if options.acquisition_type == AcquisitionType.DISCRIMINATION: if qubit.kernel is not None: # Kernels don't work with the software modulation on the acquire signal oscillator = None else: # To keep compatibility with angle and threshold discrimination (Remove when possible) threshold = qubit.threshold self.calibration[f"/logical_signal_groups/q{q}/acquire_line"] = ( lo.SignalCalibration( oscillator=oscillator, range=qubit.feedback.power_range, port_delay=self.time_of_flight * NANO_TO_SECONDS, threshold=threshold, ) )
[docs] def register_drive_line(self, qubit, intermediate_frequency): """Registers qubit drive line to calibration and signal map.""" q = qubit.name # pylint: disable=C0103 self.signal_map[qubit.drive.name] = self.device_setup.logical_signal_groups[ f"q{q}" ].logical_signals["drive_line"] self.calibration[f"/logical_signal_groups/q{q}/drive_line"] = ( lo.SignalCalibration( oscillator=lo.Oscillator( frequency=intermediate_frequency, modulation_type=lo.ModulationType.HARDWARE, ), local_oscillator=lo.Oscillator( frequency=int(qubit.drive.local_oscillator.frequency), ), range=qubit.drive.power_range, port_delay=None, delay_signal=0, ) )
[docs] def register_flux_line(self, qubit): """Registers qubit flux line to calibration and signal map.""" q = qubit.name # pylint: disable=C0103 self.signal_map[qubit.flux.name] = self.device_setup.logical_signal_groups[ f"q{q}" ].logical_signals["flux_line"] self.calibration[f"/logical_signal_groups/q{q}/flux_line"] = ( lo.SignalCalibration( range=qubit.flux.power_range, port_delay=None, delay_signal=0, voltage_offset=qubit.flux.offset, ) )
[docs] def register_couplerflux_line(self, coupler): """Registers qubit flux line to calibration and signal map.""" c = coupler.name # pylint: disable=C0103 self.signal_map[coupler.flux.name] = self.device_setup.logical_signal_groups[ f"qc{c}" ].logical_signals["flux_line"] self.calibration[f"/logical_signal_groups/qc{c}/flux_line"] = ( lo.SignalCalibration( range=coupler.flux.power_range, port_delay=None, delay_signal=0, voltage_offset=coupler.flux.offset, ) )
[docs] def run_exp(self): """ Compilation settings, compilation step, execution step and data retrival - Save a experiment Python object: self.experiment.save("saved_exp") - Save a experiment compiled experiment (): self.exp.save("saved_exp") # saving compiled experiment """ compiled_experiment = self.session.compile( self.experiment, compiler_settings=COMPILER_SETTINGS ) self.results = self.session.run(compiled_experiment)
[docs] @staticmethod def frequency_from_pulses(qubits, sequence): """Gets the frequencies from the pulses to the qubits.""" for pulse in sequence: qubit = qubits[pulse.qubit] if pulse.type is PulseType.READOUT: qubit.readout_frequency = pulse.frequency if pulse.type is PulseType.DRIVE: qubit.drive_frequency = pulse.frequency
[docs] def create_sub_sequences( self, qubits: list[Qubit] ) -> tuple[list[SubSequence], set[str]]: """Create subsequences based on locations of measurements. Returns list of subsequences and a set of channel names that were not split """ measure_channels = {measure_channel_name(qb) for qb in qubits} other_channels = set(self.sequence.keys()) - measure_channels measurement_groups = defaultdict(list) for ch in measure_channels: for i, pulse in enumerate(self.sequence[ch]): measurement_groups[i].append((ch, pulse)) measurement_start_end = {} for i, group in measurement_groups.items(): starts = np.array([meas.pulse.start for _, meas in group]) ends = np.array([meas.pulse.finish for _, meas in group]) measurement_start_end[i] = ( max(starts), max(ends), ) # max is intended for float arithmetic errors only # FIXME: this is a hotfix specifically made for any flux experiments in flux pulse mode, where the flux # pulses extend through the entire duration of the experiment. This should be removed once the sub-sequence # splitting logic is removed from the driver. channels_overlapping_measurement = set() if len(measurement_groups) == 1: for ch in other_channels: for pulse in self.sequence[ch]: if not isinstance(pulse.pulse, FluxPulse): break start, end = measurement_start_end[0] if pulse.pulse.start < end and pulse.pulse.finish > start: channels_overlapping_measurement.add(ch) break # split non-measurement channels according to the locations of the measurements sub_sequences = defaultdict(lambda: defaultdict(list)) for ch in other_channels - channels_overlapping_measurement: measurement_index = 0 for pulse in self.sequence[ch]: start, _ = measurement_start_end[measurement_index] if pulse.pulse.finish > start: measurement_index += 1 sub_sequences[measurement_index][ch].append(pulse) if len(sub_sequences) > len(measurement_groups): log.warning("There are control pulses after the last measurement start.") return [ SubSequence(measurement_groups[i], sub_sequences[i]) for i in range(len(measurement_groups)) ], channels_overlapping_measurement
[docs] def experiment_flow( self, qubits: dict[str, Qubit], couplers: dict[str, Coupler], sequence: PulseSequence, options: ExecutionParameters, ): """Create the experiment object for the devices, following the steps separated one on each method: Translation, Calibration, Experiment Definition. Args: qubits (dict[str, Qubit]): qubits for the platform. couplers (dict[str, Coupler]): couplers for the platform. sequence (PulseSequence): sequence of pulses to be played in the experiment. """ self.sequence = self.sequence_zh(sequence, qubits) self.sub_sequences, self.unsplit_channels = self.create_sub_sequences( list(qubits.values()) ) self.calibration_step(qubits, couplers, options) self.create_exp(qubits, options)
# pylint: disable=W0221
[docs] def play(self, qubits, couplers, sequence, options): """Play pulse sequence.""" return self.sweep(qubits, couplers, sequence, options)
[docs] def sequence_zh( self, sequence: PulseSequence, qubits: dict[str, Qubit] ) -> dict[str, list[ZhPulse]]: """Convert Qibo sequence to a sequence where all pulses are replaced with ZhPulse instances. The resulting object is a dictionary mapping from channel name to corresponding sequence of ZhPulse instances """ # Define and assign the sequence zhsequence = defaultdict(list) # Fill the sequences with pulses according to their lines in temporal order for pulse in sequence: if pulse.type == PulseType.READOUT: ch = measure_channel_name(qubits[pulse.qubit]) else: ch = pulse.channel zhsequence[ch].append(ZhPulse(pulse)) if self.processed_sweeps: for ch, zhpulses in zhsequence.items(): for zhpulse in zhpulses: for param, sweep in self.processed_sweeps.sweeps_for_pulse( zhpulse.pulse ): zhpulse.add_sweeper(param, sweep) return zhsequence
[docs] def create_exp(self, qubits, options): """Zurich experiment initialization using their Experiment class.""" if self.acquisition_type: acquisition_type = self.acquisition_type else: acquisition_type = ACQUISITION_TYPE[options.acquisition_type] averaging_mode = AVERAGING_MODE[options.averaging_mode] exp_options = replace( options, acquisition_type=acquisition_type, averaging_mode=averaging_mode ) signals = [lo.ExperimentSignal(name) for name in self.signal_map.keys()] exp = lo.Experiment( uid="Sequence", signals=signals, ) contexts = self._contexts(exp, exp_options) self._populate_exp(qubits, exp, exp_options, contexts) self.set_calibration_for_rt_sweep(exp) exp.set_signal_map(self.signal_map) self.experiment = exp
def _contexts( self, exp: lo.Experiment, exp_options: ExecutionParameters ) -> list[tuple[Optional[Sweeper], Any]]: """To construct a laboneq experiment, we need to first define a certain sequence of nested contexts. This method returns the corresponding sequence of context managers. """ sweep_contexts = [] for i, sweeper in enumerate(self.nt_sweeps): ctx = exp.sweep( uid=f"nt_sweep_{sweeper.parameter.name.lower()}_{i}", parameter=[ sweep_param for sweep_param in self.processed_sweeps.sweeps_for_sweeper(sweeper) ], ) sweep_contexts.append((sweeper, ctx)) shots_ctx = exp.acquire_loop_rt( uid="shots", count=exp_options.nshots, acquisition_type=exp_options.acquisition_type, averaging_mode=exp_options.averaging_mode, ) sweep_contexts.append((None, shots_ctx)) for i, sweeper in enumerate(self.rt_sweeps): ctx = exp.sweep( uid=f"rt_sweep_{sweeper.parameter.name.lower()}_{i}", parameter=[ sweep_param for sweep_param in self.processed_sweeps.sweeps_for_sweeper(sweeper) ], reset_oscillator_phase=True, ) sweep_contexts.append((sweeper, ctx)) return sweep_contexts def _populate_exp( self, qubits: dict[str, Qubit], exp: lo.Experiment, exp_options: ExecutionParameters, contexts, ): """Recursively activate the nested contexts, then define the main experiment body inside the innermost context.""" if len(contexts) == 0: self.select_exp(exp, qubits, exp_options) return sweeper, ctx = contexts[0] with ctx: if sweeper in self.nt_sweeps: self.set_instrument_nodes_for_nt_sweep(exp, sweeper) self._populate_exp(qubits, exp, exp_options, contexts[1:])
[docs] def set_calibration_for_rt_sweep(self, exp: lo.Experiment) -> None: """Set laboneq calibration of parameters that are to be swept in real- time.""" if self.processed_sweeps: calib = lo.Calibration() for ch in ( set(self.sequence.keys()) | self.processed_sweeps.channels_with_sweeps() ): for param, sweep_param in self.processed_sweeps.sweeps_for_channel(ch): if param is Parameter.frequency: calib[ch] = lo.SignalCalibration( oscillator=lo.Oscillator( frequency=sweep_param, modulation_type=lo.ModulationType.HARDWARE, ) ) exp.set_calibration(calib)
[docs] def set_instrument_nodes_for_nt_sweep( self, exp: lo.Experiment, sweeper: Sweeper ) -> None: """In some cases there is no straightforward way to sweep a parameter. In these cases we achieve sweeping by directly manipulating the instrument nodes """ for ch, param, sweep_param in self.processed_sweeps.channel_sweeps_for_sweeper( sweeper ): channel_node_path = self.get_channel_node_path(ch) if param is Parameter.bias: offset_node_path = f"{channel_node_path}/offset" exp.set_node(path=offset_node_path, value=sweep_param) # This is supposed to happen only for measurement, but we do not validate it here. if param is Parameter.amplitude: a, b = re.match(r"(.*)/(\d)/.*", channel_node_path).groups() gain_node_path = f"{a}/{b}/oscs/{b}/gain" exp.set_node(path=gain_node_path, value=sweep_param)
[docs] def get_channel_node_path(self, channel_name: str) -> str: """Return the path of the instrument node corresponding to the given channel.""" logical_signal = self.signal_map[channel_name] for instrument in self.device_setup.instruments: for conn in instrument.connections: if conn.remote_path == logical_signal.path: return f"{instrument.address}/{conn.local_port}" raise RuntimeError( f"Could not find instrument node corresponding to channel {channel_name}" )
[docs] def select_exp(self, exp, qubits, exp_options): """Build Zurich Experiment selecting the relevant sections.""" # channels that were not split are just applied in parallel to the rest of the experiment with exp.section(uid="unsplit_channels"): for ch in self.unsplit_channels: for pulse in self.sequence[ch]: exp.delay(signal=ch, time=pulse.pulse.start) self.play_sweep(exp, ch, pulse) weights = {} previous_section = None for i, seq in enumerate(self.sub_sequences): section_uid = f"control_{i}" with exp.section(uid=section_uid, play_after=previous_section): for ch, pulses in seq.control_sequence.items(): time = 0 for pulse in pulses: if pulse.delay_sweeper: exp.delay(signal=ch, time=pulse.delay_sweeper) exp.delay( signal=ch, time=round(pulse.pulse.start * NANO_TO_SECONDS, 9) - time, ) time = round(pulse.pulse.duration * NANO_TO_SECONDS, 9) + round( pulse.pulse.start * NANO_TO_SECONDS, 9 ) if pulse.zhsweepers: self.play_sweep(exp, ch, pulse) else: exp.play( signal=ch, pulse=pulse.zhpulse, phase=pulse.pulse.relative_phase, ) previous_section = section_uid if any(m.delay_sweeper is not None for _, m in seq.measurements): section_uid = f"measurement_delay_{i}" with exp.section(uid=section_uid, play_after=previous_section): for ch, m in seq.measurements: if m.delay_sweeper: exp.delay(signal=ch, time=m.delay_sweeper) previous_section = section_uid section_uid = f"measure_{i}" with exp.section(uid=section_uid, play_after=previous_section): for ch, pulse in seq.measurements: qubit = qubits[pulse.pulse.qubit] q = qubit.name exp.delay( signal=acquire_channel_name(qubit), time=self.smearing * NANO_TO_SECONDS, ) if ( qubit.kernel is not None and exp_options.acquisition_type == lo.AcquisitionType.DISCRIMINATION ): weight = lo.pulse_library.sampled_pulse_complex( samples=qubit.kernel * np.exp(1j * qubit.iq_angle), ) else: if i == 0: if ( exp_options.acquisition_type == lo.AcquisitionType.DISCRIMINATION ): weight = lo.pulse_library.sampled_pulse_complex( samples=np.ones( [ int( pulse.pulse.duration * 2 - 3 * self.smearing * NANO_TO_SECONDS ) ] ) * np.exp(1j * qubit.iq_angle), ) weights[q] = weight else: weight = lo.pulse_library.const( length=round( pulse.pulse.duration * NANO_TO_SECONDS, 9 ) - 1.5 * self.smearing * NANO_TO_SECONDS, amplitude=1, ) weights[q] = weight elif i != 0: weight = weights[q] measure_pulse_parameters = {"phase": 0} if i == len(self.sequence[measure_channel_name(qubit)]) - 1: reset_delay = exp_options.relaxation_time * NANO_TO_SECONDS else: reset_delay = 0 exp.measure( acquire_signal=acquire_channel_name(qubit), handle=f"sequence{q}_{i}", integration_kernel=weight, integration_kernel_parameters=None, integration_length=None, measure_signal=measure_channel_name(qubit), measure_pulse=pulse.zhpulse, measure_pulse_length=round( pulse.pulse.duration * NANO_TO_SECONDS, 9 ), measure_pulse_parameters=measure_pulse_parameters, measure_pulse_amplitude=None, acquire_delay=self.time_of_flight * NANO_TO_SECONDS, reset_delay=reset_delay, ) previous_section = section_uid
[docs] @staticmethod def play_sweep(exp, channel_name, pulse): """Play Zurich pulse when a single sweeper is involved.""" play_parameters = {} for p, zhs in pulse.zhsweepers: if p is Parameter.amplitude: max_value = max(np.abs(zhs.values)) pulse.zhpulse.amplitude *= max_value zhs.values /= max_value play_parameters["amplitude"] = zhs if p is Parameter.duration: play_parameters["length"] = zhs if p is Parameter.relative_phase: play_parameters["phase"] = zhs if "phase" not in play_parameters: play_parameters["phase"] = pulse.pulse.relative_phase exp.play(signal=channel_name, pulse=pulse.zhpulse, **play_parameters)
[docs] def sweep(self, qubits, couplers, sequence: PulseSequence, options, *sweepers): """Play pulse and sweepers sequence.""" self.signal_map = {} self.processed_sweeps = ProcessedSweeps(sweepers, qubits) self.nt_sweeps, self.rt_sweeps = classify_sweepers(sweepers) self.frequency_from_pulses(qubits, sequence) self.acquisition_type = None for sweeper in sweepers: if sweeper.parameter in {Parameter.frequency, Parameter.amplitude}: for pulse in sweeper.pulses: if pulse.type is PulseType.READOUT: self.acquisition_type = lo.AcquisitionType.SPECTROSCOPY self.experiment_flow(qubits, couplers, sequence, options) self.run_exp() # Get the results back results = {} for qubit in qubits.values(): q = qubit.name # pylint: disable=C0103 for i, ropulse in enumerate(self.sequence[measure_channel_name(qubit)]): data = self.results.get_data(f"sequence{q}_{i}") if options.acquisition_type is AcquisitionType.DISCRIMINATION: data = ( np.ones(data.shape) - data.real ) # Probability inversion patch serial = ropulse.pulse.serial qubit = ropulse.pulse.qubit results[serial] = results[qubit] = options.results_type(data) return results