Source code for qibolab.instruments.qblox.cluster_qrm_rf

"""Qblox Cluster QRM-RF driver."""

import json
import time

import numpy as np
from qblox_instruments.native.generic_func import SequencerStates
from qblox_instruments.qcodes_drivers.cluster import Cluster
from qblox_instruments.qcodes_drivers.module import Module
from qibo.config import log

from qibolab.pulses import Pulse, PulseSequence, PulseType
from qibolab.sweeper import Parameter, Sweeper, SweeperType

from .acquisition import AveragedAcquisition, DemodulatedAcquisition
from .module import ClusterModule
from .q1asm import Block, Register, convert_phase, loop_block, wait_block
from .sequencer import Sequencer, WaveformsBuffer
from .sweeper import QbloxSweeper, QbloxSweeperType


[docs]class QrmRf(ClusterModule): """Qblox Cluster Qubit Readout Module RF driver. Qubit Readout Module RF (QRM-RF) is an instrument that integrates an arbitrary wave generator, a digitizer, a local oscillator and a mixer. It has one output and one input port. Each port has a path0 and path1 for the i(in-phase) and q(quadrature) components of the RF signal. The sampling rate of its ADC/DAC is 1 GSPS. https://www.qblox.com/cluster The class aims to simplify the configuration of the instrument, exposing only those parameters most frequently used and hiding other more complex settings. A reference to the underlying `qblox_instruments.qcodes_drivers.qcm_qrm.QRM_QCM` object is provided via the attribute `device`, allowing the advanced user to gain access to the features that are not exposed directly by the class. In order to accelerate the execution, the instrument settings are cached, so that the communication with the instrument only happens when the parameters change. This caching is done with the method `_set_device_parameter(target, *parameters, value)`. .. code-block:: text ports: o1: # output port settings channel : L3-25a attenuation : 30 # (dB) 0 to 60, must be multiple of 2 lo_enabled : true lo_frequency : 6_000_000_000 # (Hz) from 2e9 to 18e9 gain : 1 # for path0 and path1 -1.0<=v<=1.0 i1: # input port settings channel : L2-5a acquisition_hold_off : 130 # minimum 4ns acquisition_duration : 1800 classification_parameters: 0: # qubit id rotation_angle : 0 # in degrees 0.0<=v<=360.0 threshold : 0 # in V 1: rotation_angle : 194.272 threshold : 0.011197 2: rotation_angle : 104.002 threshold : 0.012745 Attributes: name (str): A unique name given to the instrument. address (str): IP_address:module_number; the IP address of the cluster and the module number. device (QcmQrm): A reference to the underlying `qblox_instruments.qcodes_drivers.qcm_qrm.QcmQrm` object. It can be used to access other features not directly exposed by this wrapper. https://qblox-qblox-instruments.readthedocs-hosted.com/en/master/api_reference/qcm_qrm.html ports = A dictionary giving access to the input and output ports objects. - ports['o1']: Output port - ports['i1']: Input port - ports['o1'].channel (int | str): the id of the refrigerator channel the output port o1 is connected to. - ports['o1'].attenuation (int): (mapped to qrm.out0_att) Controls the attenuation applied to the output port. It must be a multiple of 2. - ports['o1'].lo_enabled (bool): (mapped to qrm.out0_in0_lo_en) Enables or disables the local oscillator. - ports['o1'].lo_frequency (int): (mapped to qrm.out0_in0_lo_freq) Sets the frequency of the local oscillator. - ports['o1'].gain (float): (mapped to qrm.sequencers[0].gain_awg_path0 and qrm.sequencers[0].gain_awg_path1) Sets the gain on both paths of the output port. - ports['o1'].hardware_mod_en (bool): (mapped to qrm.sequencers[0].mod_en_awg) Enables pulse modulation in hardware. When set to False, pulse modulation is done in software, at the host computer, and the modulated pulse waveform is uploaded to the instrument. When set to True, the envelope of the pulse is uploaded to the instrument and it is modulated in real time by the FPGA of the instrument, using the sequencer nco (numerically controlled oscillator). - ports['o1'].nco_freq (int): (mapped to qrm.sequencers[0].nco_freq). Mapped, but not configurable from the runcard. - ports['o1'].nco_phase_offs = (mapped to qrm.sequencers[0].nco_phase_offs). Mapped, but not configurable from the runcard. - ports['i1'].channel (int | str): the id of the refrigerator channel the input port o1 is connected to. - ports['i1'].acquisition_hold_off (int): Delay between the moment the readout pulse starts to be played and the start of the acquisition, in ns. It must be > 0 and multiple of 4. - ports['i1'].acquisition_duration (int): (mapped to qrm.sequencers[0].integration_length_acq) Duration of the pulse acquisition, in ns. It must be > 0 and multiple of 4. - ports['i1'].hardware_demod_en (bool): (mapped to qrm.sequencers[0].demod_en_acq) Enables demodulation and integration of the acquired pulses in hardware. When set to False, the filtration, demodulation and integration of the acquired pulses is done at the host computer. When set to True, the demodulation, integration and discretization of the pulse is done in real time at the FPGA of the instrument. - Sequencer 0 is used always for acquisitions and it is the first sequencer used to synthesise pulses. - Sequencer 1 to 6 are used as needed to synthesise simultaneous pulses on the same channel (required in multiplexed readout) or when the memory of the default sequencers rans out. classification_parameters (dict): A dictionary containing the parameters needed classify the state of each qubit. from a single shot measurement: qubit_id (dict): the id of the qubit rotation_angle (float): 0 # in degrees 0.0<=v<=360.0. The angle of the rotation applied at the origin of the i q plane, that put the centroids of the state ``|0>`` and state ``|1>`` in a horizontal line. The rotation puts the centroid of state ``|1>`` to the right side of centroid of state ``|0>``. threshold (float): 0 # in V. The real component of the point along the horizontal line connecting both state centroids (after being rotated), that maximises the fidelity of the classification. channels (list): A list of the channels to which the instrument is connected. """ DEFAULT_SEQUENCERS: dict = {"o1": 0, "i1": 0} FREQUENCY_LIMIT = 500e6 # 500 MHz def __init__(self, name: str, address: str): """Initialize a Qblox QRM-RF module. Parameters: - name: An arbitrary name to identify the module. - address: The network address of the instrument, specified as "cluster_IP:module_slot_idx". - cluster: The Cluster object to which the QRM-RF module is connected. Example: To create a QrmRf instance named 'qrm_rf' connected to slot 2 of a Cluster at address '192.168.0.100': >>> cluster_instance = Cluster("cluster","192.168.1.100", settings) >>> qrm_module = QrmRf(name="qrm_rf", address="192.168.1.100:2", cluster=cluster_instance) """ super().__init__(name, address) self.device: Module = None self.classification_parameters: dict = {} self.settings: dict = {} self._debug_folder: str = "" self._input_ports_keys = ["i1"] self._output_ports_keys = ["o1"] self._sequencers: dict[Sequencer] = {"o1": []} self.channel_map: dict = {} self._device_num_output_ports = 1 self._device_num_sequencers: int self._free_sequencers_numbers: list[int] = [] self._used_sequencers_numbers: list[int] = [] self._unused_sequencers_numbers: list[int] = [] self._execution_time: float = 0 def _set_default_values(self): # disable all sequencer connections self.device.disconnect_outputs() self.device.disconnect_inputs() # set I (path0) and Q (path1) offset to zero on output port 0. Default values after reboot = 7.625 [self.device.set(f"out0_offset_path{i}", 0) for i in range(2)] # set input port parameters to default self.device.set("in0_att", 0) self.device.set("scope_acq_avg_mode_en_path0", True) self.device.set("scope_acq_avg_mode_en_path1", True) self.device.set("scope_acq_sequencer_select", self.DEFAULT_SEQUENCERS["i1"]) self.device.set("scope_acq_trigger_level_path0", 0) self.device.set("scope_acq_trigger_level_path1", 0) self.device.set("scope_acq_trigger_mode_path0", "sequencer") self.device.set("scope_acq_trigger_mode_path1", "sequencer") # initialise the parameters of the default sequencer to the default values, # the rest of the sequencers are disconnected, but will be configured # with the same parameters as the default in process_pulse_sequence() target = self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]] for name, value in self.DEFAULT_SEQUENCERS_VALUES.items(): target.set(name, value) # connect sequencer to out/in ports target.set("connect_out0", "IQ") target.set("connect_acq", "in0")
[docs] def connect(self, cluster: Cluster = None): """Connects to the instrument using the instrument settings in the runcard. Once connected, it creates port classes with properties mapped to various instrument parameters, and initialises the the underlying device parameters. It uploads to the module the port settings loaded from the runcard. """ if self.is_connected: return elif cluster is not None: self.device = cluster.modules[int(self.address.split(":")[1]) - 1] # test connection with module if not self.device.present(): raise ConnectionError( f"Module {self.device.name} not connected to cluster {cluster.name}" ) # once connected, initialise the parameters of the device to the default values self._device_num_sequencers = len(self.device.sequencers) self._set_default_values() # then set the value loaded from the runcard try: if "o1" in self.settings: self._ports["o1"].attenuation = self.settings["o1"]["attenuation"] if self.settings["o1"]["lo_frequency"]: self._ports["o1"].lo_enabled = True self._ports["o1"].lo_frequency = self.settings["o1"][ "lo_frequency" ] self._ports["o1"].hardware_mod_en = True self._ports["o1"].nco_freq = 0 self._ports["o1"].nco_phase_offs = 0 if "i1" in self.settings: self._ports["i1"].hardware_demod_en = True self._ports["i1"].acquisition_hold_off = self.settings["i1"][ "acquisition_hold_off" ] self._ports["i1"].acquisition_duration = self.settings["i1"][ "acquisition_duration" ] except Exception as error: raise RuntimeError( f"Unable to initialize port parameters on module {self.name}: {error}" ) self.is_connected = True
[docs] def setup(self, **settings): """Cache the settings of the runcard and instantiate the ports of the module. Args: **settings: dict = A dictionary of settings loaded from the runcard: - settings['o1']['attenuation'] (int): [0 to 60 dBm, in multiples of 2] attenuation at the output. - settings['o1']['lo_enabled'] (bool): enable or disable local oscillator for up-conversion. - settings['o1']['lo_frequency'] (int): [2_000_000_000 to 18_000_000_000 Hz] local oscillator frequency. - settings['o1']['hardware_mod_en'] (bool): enables Hardware Modulation. In this mode, pulses are modulated to the intermediate frequency using the numerically controlled oscillator within the fpga. It only requires the upload of the pulse envelope waveform. At the moment this param is not loaded but is always set to True. - settings['i1']['hardware_demod_en'] (bool): enables Hardware Demodulation. In this mode, the sequencers of the fpga demodulate, integrate and classify the results for every shot. Once integrated, the i and q values and the result of the classification requires much less memory, so they can be stored for every shot in separate `bins` and retrieved later. Hardware Demodulation also allows making multiple readouts on the same qubit at different points in the circuit, which is not possible with Software Demodulation. At the moment this param is not loaded but is always set to True. - settings['i1']['acquisition_hold_off'] (int): [0 to 16834 ns, in multiples of 4] the time between the moment the start of the readout pulse begins to be played, and the start of the acquisition. This is used to account for the time of flight of the pulses from the output port to the input port. - settings['i1']['acquisition_duration'] (int): [0 to 8192 ns] the duration of the acquisition. It is limited by the amount of memory available in the fpga to store i q samples. """ self.settings = settings if settings else self.settings
def _get_next_sequencer(self, port: str, frequency: int, qubits: dict, qubit: None): """Retrieves and configures the next avaliable sequencer. The parameters of the new sequencer are copied from those of the default sequencer, except for the intermediate frequency and classification parameters. Args: port (str): frequency (int): qubit (str|int): Raises: Exception = If attempting to set a parameter without a connection to the instrument. """ # select a new sequencer and configure it as required next_sequencer_number = self._free_sequencers_numbers.pop(0) if next_sequencer_number != self.DEFAULT_SEQUENCERS[port]: for parameter in self.device.sequencers[ self.DEFAULT_SEQUENCERS[port] ].parameters: # exclude read-only parameter `sequence` and others that have wrong default values (qblox bug) if not parameter in [ "sequence", "thresholded_acq_marker_address", "thresholded_acq_trigger_address", ]: value = self.device.sequencers[self.DEFAULT_SEQUENCERS[port]].get( param_name=parameter ) if value: target = self.device.sequencers[next_sequencer_number] target.set(parameter, value) # if hardware demodulation is enabled, configure nco_frequency and classification parameters if self._ports["i1"].hardware_demod_en or self._ports["o1"].hardware_mod_en: self.device.sequencers[next_sequencer_number].set("nco_freq", frequency) # It assumes all pulses in non_overlapping_pulses set have the same frequency. # Non-overlapping pulses of different frequencies on the same qubit channel, with hardware_demod_en # would lead to wrong results. # TODO: Throw error in that event or implement non_overlapping_same_frequency_pulses # Even better, set the frequency before each pulse is played (would work with hardware modulation only) # if self._ports["i1"].hardware_demod_en and qubit in self.classification_parameters: if self._ports["i1"].hardware_demod_en and not qubits[qubit].threshold is None: self.device.sequencers[next_sequencer_number].set( "thresholded_acq_rotation", (qubits[qubit].iq_angle * 360 / (2 * np.pi)) % 360, ) self.device.sequencers[next_sequencer_number].set( "thresholded_acq_threshold", qubits[qubit].threshold * self._ports["i1"].acquisition_duration, ) # create sequencer wrapper sequencer = Sequencer(next_sequencer_number) sequencer.qubit = qubit return sequencer
[docs] def get_if(self, pulse: Pulse): """Returns the intermediate frequency needed to synthesise a pulse based on the port lo frequency.""" _rf = pulse.frequency _lo = self.channel_map[pulse.channel].lo_frequency _if = _rf - _lo if abs(_if) > self.FREQUENCY_LIMIT: raise Exception( f""" Pulse frequency {_rf:_} cannot be synthesised with current lo frequency {_lo:_}. The intermediate frequency {_if:_} would exceed the maximum frequency of {self.FREQUENCY_LIMIT:_} """ ) return _if
[docs] def process_pulse_sequence( self, qubits: dict, instrument_pulses: PulseSequence, navgs: int, nshots: int, repetition_duration: int, sweepers=None, ): """Processes a sequence of pulses and sweepers, generating the waveforms and program required by the instrument to synthesise them. The output of the process is a list of sequencers used for each port, configured with the information required to play the sequence. The following features are supported: - multiplexed readout of up to 6 qubits - overlapping pulses - hardware modulation, demodulation, and classification - software modulation, with support for arbitrary pulses - software demodulation - binned acquisition - real-time sweepers of - pulse frequency (requires hardware modulation) - pulse relative phase (requires hardware modulation) - pulse amplitude - pulse start - pulse duration - port gain - port offset - multiple readouts for the same qubit (sequence unrolling) - pulses of up to 8192 pairs of i, q samples - sequencer memory optimisation (waveforms cache) - extended waveform memory with the use of multiple sequencers - intrument parameters cache Args: instrument_pulses (PulseSequence): A collection of Pulse objects to be played by the instrument. navgs (int): The number of times the sequence of pulses should be executed averaging the results. nshots (int): The number of times the sequence of pulses should be executed without averaging. repetition_duration (int): The total duration of the pulse sequence execution plus the reset/relaxation time. sweepers (list(Sweeper)): A list of Sweeper objects to be implemented. """ if sweepers is None: sweepers = [] sequencer: Sequencer sweeper: Sweeper # calculate the number of bins num_bins = nshots for sweeper in sweepers: num_bins *= len(sweeper.values) # estimate the execution time self._execution_time = ( navgs * num_bins * ((repetition_duration + 1000 * len(sweepers)) * 1e-9) ) port = "o1" # initialise the list of free sequencer numbers to include the default for each port {'o1': 0} self._free_sequencers_numbers = [self.DEFAULT_SEQUENCERS[port]] + [ 1, 2, 3, 4, 5, ] # split the collection of instruments pulses by ports # ro_channel = None # feed_channel = None port_channel = [ chan.name for chan in self.channel_map.values() if chan.port.name == port ] port_pulses: PulseSequence = instrument_pulses.get_channel_pulses(*port_channel) # initialise the list of sequencers required by the port self._sequencers[port] = [] if not port_pulses.is_empty: # split the collection of port pulses in non overlapping pulses non_overlapping_pulses: PulseSequence for non_overlapping_pulses in port_pulses.separate_overlapping_pulses(): # each set of not overlapping pulses will be played by a separate sequencer # check sequencer availability if len(self._free_sequencers_numbers) == 0: raise Exception( f"The number of sequencers requried to play the sequence exceeds the number available {self._device_num_sequencers}." ) # get next sequencer sequencer = self._get_next_sequencer( port=port, frequency=self.get_if(non_overlapping_pulses[0]), qubits=qubits, qubit=non_overlapping_pulses[0].qubit, ) # add the sequencer to the list of sequencers required by the port self._sequencers[port].append(sequencer) # make a temporary copy of the pulses to be processed pulses_to_be_processed = non_overlapping_pulses.shallow_copy() while not pulses_to_be_processed.is_empty: pulse: Pulse = pulses_to_be_processed[0] # attempt to save the waveforms to the sequencer waveforms buffer try: sequencer.waveforms_buffer.add_waveforms( pulse, self._ports[port].hardware_mod_en, sweepers ) sequencer.pulses.add(pulse) pulses_to_be_processed.remove(pulse) # if there is not enough memory in the current sequencer, use another one except WaveformsBuffer.NotEnoughMemory: if ( len(pulse.waveform_i) + len(pulse.waveform_q) > WaveformsBuffer.SIZE ): raise NotImplementedError( f"Pulses with waveforms longer than the memory of a sequencer ({WaveformsBuffer.SIZE // 2}) are not supported." ) if len(self._free_sequencers_numbers) == 0: raise Exception( f"The number of sequencers requried to play the sequence exceeds the number available {self._device_num_sequencers}." ) # get next sequencer sequencer = self._get_next_sequencer( port=port, frequency=self.get_if(non_overlapping_pulses[0]), qubits=qubits, qubit=non_overlapping_pulses[0].qubit, ) # add the sequencer to the list of sequencers required by the port self._sequencers[port].append(sequencer) # update the lists of used and unused sequencers that will be needed later on self._used_sequencers_numbers = [] for port in self._output_ports_keys: for sequencer in self._sequencers[port]: self._used_sequencers_numbers.append(sequencer.number) self._unused_sequencers_numbers = [] for n in range(self._device_num_sequencers): if not n in self._used_sequencers_numbers: self._unused_sequencers_numbers.append(n) # generate and store the Waveforms dictionary, the Acquisitions dictionary, the Weights and the Program for port in self._output_ports_keys: for sequencer in self._sequencers[port]: pulses = sequencer.pulses program = sequencer.program ## pre-process sweepers ## # TODO: move qibolab sweepers preprocessing to qblox controller # attach a sweeper attribute to the pulse so that it is easily accesible by the code that generates # the pseudo-assembly program pulse = None for pulse in pulses: pulse.sweeper = None pulse_sweeper_parameters = [ Parameter.frequency, Parameter.amplitude, Parameter.duration, Parameter.relative_phase, Parameter.start, ] for sweeper in sweepers: if sweeper.parameter in pulse_sweeper_parameters: # check if this sequencer takes an active role in the sweep if sweeper.pulses and set(sequencer.pulses) & set( sweeper.pulses ): # plays an active role reference_value = None if sweeper.parameter == Parameter.frequency: if sequencer.pulses: reference_value = self.get_if( sequencer.pulses[0] ) # uses the frequency of the first pulse (assuming all same freq) if sweeper.parameter == Parameter.amplitude: for pulse in pulses: if pulse in sweeper.pulses: reference_value = ( pulse.amplitude ) # uses the amplitude of the first pulse if ( sweeper.parameter == Parameter.duration and pulse in sweeper.pulses ): # for duration sweepers bake waveforms sweeper.qs = QbloxSweeper( program=program, type=QbloxSweeperType.duration, rel_values=pulse.idx_range, ) else: # create QbloxSweepers and attach them to qibolab sweeper if ( sweeper.type == SweeperType.OFFSET and reference_value ): sweeper.qs = QbloxSweeper.from_sweeper( program=program, sweeper=sweeper, add_to=reference_value, ) elif ( sweeper.type == SweeperType.FACTOR and reference_value ): sweeper.qs = QbloxSweeper.from_sweeper( program=program, sweeper=sweeper, multiply_to=reference_value, ) else: sweeper.qs = QbloxSweeper.from_sweeper( program=program, sweeper=sweeper ) # finally attach QbloxSweepers to the pulses being swept sweeper.qs.update_parameters = True pulse.sweeper = sweeper.qs else: # does not play an active role sweeper.qs = QbloxSweeper( program=program, type=QbloxSweeperType.number, rel_values=range(len(sweeper.values)), name=sweeper.parameter.name, ) # else: # qubit_sweeper_parameters # if sweeper.qubits and sequencer.qubit in [_.name for _ in sweeper.qubits]: # # plays an active role # if sweeper.parameter == Parameter.bias: # reference_value = self._ports[port].offset # # create QbloxSweepers and attach them to qibolab sweeper # if sweeper.type == SweeperType.ABSOLUTE: # sweeper.qs = QbloxSweeper.from_sweeper( # program=program, sweeper=sweeper, add_to=-reference_value # ) # elif sweeper.type == SweeperType.OFFSET: # sweeper.qs = QbloxSweeper.from_sweeper(program=program, sweeper=sweeper) # elif sweeper.type == SweeperType.FACTOR: # raise Exception("SweeperType.FACTOR for Parameter.bias not supported") # sweeper.qs.update_parameters = True # else: # # does not play an active role # sweeper.qs = QbloxSweeper( # program=program, type=QbloxSweeperType.number, rel_values=range(len(sweeper.values)), # name = sweeper.parameter.name # ) else: # does not play an active role sweeper.qs = QbloxSweeper( program=program, type=QbloxSweeperType.number, rel_values=range(len(sweeper.values)), name=sweeper.parameter.name, ) # # FIXME: for qubit sweepers (Parameter.bias, Parameter.attenuation, Parameter.gain), the qubit # # information alone is not enough to determine what instrument parameter is to be swept. # # For example port gain, both the drive and readout ports have gain parameters. # # Until this is resolved, and since bias is only implemented with QCMs offset, this instrument will # # never take an active role in those sweeps. # Waveforms for index, waveform in enumerate( sequencer.waveforms_buffer.unique_waveforms ): sequencer.waveforms[waveform.serial] = { "data": waveform.data.tolist(), "index": index, } # Acquisitions pulse = None for acquisition_index, pulse in enumerate(sequencer.pulses.ro_pulses): sequencer.acquisitions[pulse.serial] = { "num_bins": num_bins, "index": acquisition_index, } # Add scope_acquisition to default sequencer if ( sequencer.number == self.DEFAULT_SEQUENCERS[port] and pulse is not None ): sequencer.acquisitions["scope_acquisition"] = { "num_bins": 1, "index": acquisition_index + 1, } # Program minimum_delay_between_instructions = 4 # Active reset is not fully tested yet active_reset = False active_reset_address = 1 active_reset_pulse_idx_I = 1 active_reset_pulse_idx_Q = 1 sequence_total_duration = ( pulses.finish ) # the minimum delay between instructions is 4ns time_between_repetitions = repetition_duration - sequence_total_duration assert time_between_repetitions > minimum_delay_between_instructions # TODO: currently relaxation_time needs to be greater than acquisition_hold_off # so that the time_between_repetitions is equal to the sequence_total_duration + relaxation_time # to be compatible with th erest of the platforms, change it so that time_between_repetitions # is equal to pulsesequence duration + acquisition_hold_off if relaxation_time < acquisition_hold_off # create registers for key variables # nshots is used in the loop that iterates over the number of shots nshots_register = Register(program, "nshots") # during a sweep, each shot is saved in the bin bin_n bin_n = Register(program, "bin_n") # navgs is used in the loop of hardware averages navgs_register = Register(program, "navgs") header_block = Block("setup") if active_reset: header_block.append( f"set_latch_en {active_reset_address}, 4", f"monitor triggers on address {active_reset_address}", ) body_block = Block() body_block.append(f"wait_sync {minimum_delay_between_instructions}") if ( self._ports["i1"].hardware_demod_en or self._ports["o1"].hardware_mod_en ): body_block.append("reset_ph") body_block.append_spacer() pulses_block = Block("play_and_acquire") # Add an initial wait instruction for the first pulse of the sequence initial_wait_block = wait_block( wait_time=pulses[0].start, register=Register(program), force_multiples_of_four=True, ) pulses_block += initial_wait_block for n in range(pulses.count): if ( pulses[n].sweeper and pulses[n].sweeper.type == QbloxSweeperType.start ): pulses_block.append(f"wait {pulses[n].sweeper.register}") if self._ports["o1"].hardware_mod_en: # # Set frequency # _if = self.get_if(pulses[n]) # pulses_block.append(f"set_freq {convert_frequency(_if)}", f"set intermediate frequency to {_if} Hz") # Set phase if ( pulses[n].sweeper and pulses[n].sweeper.type == QbloxSweeperType.relative_phase ): pulses_block.append(f"set_ph {pulses[n].sweeper.register}") else: pulses_block.append( f"set_ph {convert_phase(pulses[n].relative_phase)}", comment=f"set relative phase {pulses[n].relative_phase} rads", ) if pulses[n].type == PulseType.READOUT: delay_after_play = self._ports["i1"].acquisition_hold_off if len(pulses) > n + 1: # If there are more pulses to be played, the delay is the time between the pulse end and the next pulse start delay_after_acquire = ( pulses[n + 1].start - pulses[n].start - self._ports["i1"].acquisition_hold_off ) else: delay_after_acquire = ( sequence_total_duration - pulses[n].start ) time_between_repetitions = ( repetition_duration - sequence_total_duration - self._ports["i1"].acquisition_hold_off ) assert time_between_repetitions > 0 if delay_after_acquire < minimum_delay_between_instructions: raise Exception( f"The minimum delay after starting acquisition is {minimum_delay_between_instructions}ns." ) if ( pulses[n].sweeper and pulses[n].sweeper.type == QbloxSweeperType.duration ): RI = pulses[n].sweeper.register if pulses[n].type == PulseType.FLUX: RQ = pulses[n].sweeper.register else: RQ = pulses[n].sweeper.aux_register pulses_block.append( f"play {RI},{RQ},{delay_after_play}", # FIXME delay_after_play won't work as the duration increases comment=f"play pulse {pulses[n]} sweeping its duration", ) else: # Prepare play instruction: play wave_i_index, wave_q_index, delay_next_instruction pulses_block.append( f"play {sequencer.waveforms_buffer.unique_waveforms.index(pulses[n].waveform_i)},{sequencer.waveforms_buffer.unique_waveforms.index(pulses[n].waveform_q)},{delay_after_play}", comment=f"play waveforms {pulses[n]}", ) # Prepare acquire instruction: acquire acquisition_index, bin_index, delay_next_instruction if active_reset: pulses_block.append( f"acquire {pulses.ro_pulses.index(pulses[n])},{bin_n},4" ) pulses_block.append( f"latch_rst {delay_after_acquire + 300 - 4}" ) else: pulses_block.append( f"acquire {pulses.ro_pulses.index(pulses[n])},{bin_n},{delay_after_acquire}" ) else: # Calculate the delay_after_play that is to be used as an argument to the play instruction if len(pulses) > n + 1: # If there are more pulses to be played, the delay is the time between the pulse end and the next pulse start delay_after_play = pulses[n + 1].start - pulses[n].start else: delay_after_play = sequence_total_duration - pulses[n].start if delay_after_play < minimum_delay_between_instructions: raise Exception( f"The minimum delay between the start of two pulses in the same channel is {minimum_delay_between_instructions}ns." ) if ( pulses[n].sweeper and pulses[n].sweeper.type == QbloxSweeperType.duration ): RI = pulses[n].sweeper.register if pulses[n].type == PulseType.FLUX: RQ = pulses[n].sweeper.register else: RQ = pulses[n].sweeper.aux_register pulses_block.append( f"play {RI},{RQ},{delay_after_play}", # FIXME delay_after_play won't work as the duration increases comment=f"play pulse {pulses[n]} sweeping its duration", ) else: # Prepare play instruction: play wave_i_index, wave_q_index, delay_next_instruction pulses_block.append( f"play {sequencer.waveforms_buffer.unique_waveforms.index(pulses[n].waveform_i)},{sequencer.waveforms_buffer.unique_waveforms.index(pulses[n].waveform_q)},{delay_after_play}", comment=f"play waveforms {pulses[n]}", ) body_block += pulses_block body_block.append_spacer() if active_reset: final_reset_block = Block() final_reset_block.append( f"set_cond 1, {active_reset_address}, 0, 4", comment="active reset", ) final_reset_block.append( f"play {active_reset_pulse_idx_I}, {active_reset_pulse_idx_Q}, 4", level=1, ) final_reset_block.append( f"set_cond 0, {active_reset_address}, 0, 4" ) else: final_reset_block = wait_block( wait_time=time_between_repetitions, register=Register(program), force_multiples_of_four=False, ) final_reset_block.append_spacer() final_reset_block.append( f"add {bin_n}, 1, {bin_n}", "increase bin counter" ) body_block += final_reset_block footer_block = Block("cleanup") footer_block.append(f"stop") # wrap pulses block in sweepers loop blocks for sweeper in sweepers: body_block = sweeper.qs.block(inner_block=body_block) nshots_block: Block = loop_block( start=0, stop=nshots, step=1, register=nshots_register, block=body_block, ) nshots_block.prepend(f"move 0, {bin_n}", "reset bin counter") nshots_block.append_spacer() navgs_block = loop_block( start=0, stop=navgs, step=1, register=navgs_register, block=nshots_block, ) program.add_blocks(header_block, navgs_block, footer_block) sequencer.program = repr(program)
[docs] def upload(self): """Uploads waveforms and programs of all sequencers and arms them in preparation for execution. This method should be called after `process_pulse_sequence()`. It configures certain parameters of the instrument based on the needs of resources determined while processing the pulse sequence. """ # Setup for sequencer_number in self._used_sequencers_numbers: target = self.device.sequencers[sequencer_number] target.set("sync_en", True) target.set("marker_ovr_en", True) # Default after reboot = False target.set("marker_ovr_value", 15) # Default after reboot = 0 for sequencer_number in self._unused_sequencers_numbers: target = self.device.sequencers[sequencer_number] target.set("sync_en", False) target.set("marker_ovr_en", False) # Default after reboot = False target.set("marker_ovr_value", 0) # Default after reboot = 0 if sequencer_number >= 1: # Never disconnect default sequencers target.set("connect_out0", "off") target.set("connect_acq", "in0") # Upload waveforms and program qblox_dict = {} sequencer: Sequencer for port in self._output_ports_keys: for sequencer in self._sequencers[port]: # Add sequence program and waveforms to single dictionary qblox_dict[sequencer] = { "waveforms": sequencer.waveforms, "weights": sequencer.weights, "acquisitions": sequencer.acquisitions, "program": sequencer.program, } # Upload dictionary to the device sequencers self.device.sequencers[sequencer.number].sequence(qblox_dict[sequencer]) # DEBUG: QRM RF Save sequence to file if self._debug_folder != "": filename = ( self._debug_folder + f"Z_{self.name}_sequencer{sequencer.number}_sequence.json" ) with open(filename, "w", encoding="utf-8") as file: json.dump(qblox_dict[sequencer], file, indent=4) file.write(sequencer.program) # Clear acquisition memory and arm sequencers for sequencer_number in self._used_sequencers_numbers: self.device.sequencers[sequencer_number].delete_acquisition_data(all=True) self.device.arm_sequencer(sequencer_number) # DEBUG: QRM RF Print Readable Snapshot # print(self.name) # self.device.print_readable_snapshot(update=True) # DEBUG: QRM RF Save Readable Snapshot from qibolab.instruments.qblox.debug import print_readable_snapshot if self._debug_folder != "": filename = self._debug_folder + f"Z_{self.name}_snapshot.json" with open(filename, "w", encoding="utf-8") as file: print_readable_snapshot(self.device, file, update=True)
[docs] def play_sequence(self): """Plays the sequence of pulses. Starts the sequencers needed to play the sequence of pulses. """ # Start used sequencers for sequencer_number in self._used_sequencers_numbers: self.device.start_sequencer(sequencer_number)
[docs] def acquire(self): """Retrieves the readout results. The results returned vary depending on whether demodulation is performed in software or hardware. See :class:`qibolab.instruments.qblox.acquisition.AveragedAcquisition` and :class:`qibolab.instruments.qblox.acquisition.DemodulatedAcquisition` for more details """ # wait until all sequencers stop time_out = int(self._execution_time) + 60 t = time.time() for sequencer_number in self._used_sequencers_numbers: while True: status = self.device.get_sequencer_status(sequencer_number) if status.state is SequencerStates.STOPPED: # TODO: check flags for errors break elif time.time() - t > time_out: log.info( f"Timeout - {self.device.sequencers[sequencer_number].name} status: {status}" ) self.device.stop_sequencer(sequencer_number) break time.sleep(0.5) # Qblox qrm modules only have one memory for scope acquisition. # Only one sequencer can save data to that memory. # Several acquisitions at different points in the circuit will result in the undesired averaging # of the results. # Scope Acquisition should only be used with one acquisition per module. # Several readout pulses are supported for as long as they take place symultaneously. # Scope Acquisition data should be ignored with more than one acquisition or with Hardware Demodulation. # Software Demodulation requires the data from Scope Acquisition, therefore Software Demodulation only works # with one acquisition per module. # The data is retrieved by storing it first in one of the acquisitions of one of the sequencers. # Any could be used, but we always use 'scope_acquisition' acquisition of the default sequencer to store it. acquisitions = {} duration = self._ports["i1"].acquisition_duration hardware_demod_enabled = self._ports["i1"].hardware_demod_en for port in self._output_ports_keys: for sequencer in self._sequencers[port]: # Store scope acquisition data on 'scope_acquisition' acquisition of the default sequencer # TODO: Maybe this store_scope can be done only if needed to optimize the process! if sequencer.number == self.DEFAULT_SEQUENCERS[port]: self.device.store_scope_acquisition( sequencer.number, "scope_acquisition" ) scope = self.device.get_acquisitions(sequencer.number)[ "scope_acquisition" ] if not hardware_demod_enabled: # Software Demodulation if len(sequencer.pulses.ro_pulses) == 1: pulse = sequencer.pulses.ro_pulses[0] frequency = self.get_if(pulse) acquisitions[pulse.qubit] = acquisitions[pulse.serial] = ( AveragedAcquisition(scope, duration, frequency) ) else: raise RuntimeError( "Software Demodulation only supports one acquisition per channel. " "Multiple readout pulses are supported as long as they are symultaneous (requiring one acquisition)." ) else: # Hardware Demodulation results = self.device.get_acquisitions(sequencer.number) for pulse in sequencer.pulses.ro_pulses: bins = results[pulse.serial]["acquisition"]["bins"] acquisitions[pulse.qubit] = acquisitions[pulse.serial] = ( DemodulatedAcquisition(scope, bins, duration) ) # TODO: to be updated once the functionality of ExecutionResults is extended return {key: acquisition for key, acquisition in acquisitions.items()}
[docs] def disconnect(self): """Stops all sequencers, disconnect all the outputs from the AWG paths of the sequencers and disconnect all the inputs from the acquisition paths of the sequencers.""" if not self.is_connected: return for sequencer_number in self._used_sequencers_numbers: status = self.device.get_sequencer_status(sequencer_number) if status.state is not SequencerStates.STOPPED: log.warning( f"Device {self.device.sequencers[sequencer_number].name} did not stop normally\nstatus: {status}" ) self.device.stop_sequencer() self.device.disconnect_outputs() self.device.disconnect_inputs() self.is_connected = False self.device = None