Source code for qibolab.instruments.qblox.controller

import signal
from dataclasses import replace

import numpy as np
from qblox_instruments.qcodes_drivers.cluster import Cluster as QbloxCluster
from qibo.config import log, raise_error

from qibolab import AcquisitionType, AveragingMode, ExecutionParameters
from qibolab.instruments.abstract import Controller
from qibolab.instruments.qblox.cluster_qcm_bb import QcmBb
from qibolab.instruments.qblox.cluster_qcm_rf import QcmRf
from qibolab.instruments.qblox.cluster_qrm_rf import QrmRf
from qibolab.instruments.qblox.sequencer import SAMPLING_RATE
from qibolab.pulses import PulseSequence, PulseType
from qibolab.result import SampleResults
from qibolab.sweeper import Parameter, Sweeper, SweeperType
from qibolab.unrolling import Bounds

SEQUENCER_MEMORY = 2**17


[docs]class QbloxController(Controller): """A controller to manage qblox devices. Attributes: is_connected (bool): . modules (dict): A dictionay with the qblox modules connected to the experiment. """ def __init__( self, name, address: str, modules, internal_reference_clock: bool = True ): """Initialises the controller.""" super().__init__(name=name, address=address) self.is_connected = False self.cluster: QbloxCluster = None self.modules: dict = modules self._reference_clock = "internal" if internal_reference_clock else "external" self.bounds = Bounds( waveforms=int( 4e4 ), # Translate SEQUENCER_MEMORY = 2**17 into pulse duration readout=int(1e6), instructions=int(1e6), ) signal.signal(signal.SIGTERM, self._termination_handler) @property def sampling_rate(self): return SAMPLING_RATE
[docs] def connect(self): """Connects to the modules.""" if self.is_connected: return try: # Connect cluster QbloxCluster.close_all() self.cluster = QbloxCluster(self.name, self.address) self.cluster.reset() self.cluster.set("reference_source", self._reference_clock) # Connect modules for module in self.modules.values(): module.connect(self.cluster) self.is_connected = True log.info("QbloxController: all modules connected.") except Exception as exception: raise ConnectionError(f"Unable to connect:\n{str(exception)}\n")
# TODO: check for exception 'The module qrm_rf0 does not have parameters in0_att' and reboot the cluster
[docs] def disconnect(self): """Disconnects all modules.""" if self.is_connected: for module in self.modules.values(): module.disconnect() self.cluster.close() self.is_connected = False
def _termination_handler(self, signum, frame): """Calls all modules to stop if the program receives a termination signal.""" log.warning("Termination signal received, disconnecting modules.") if self.is_connected: for name in self.modules: self.modules[name].disconnect() log.warning("QbloxController: all modules are disconnected.") exit(0) def _set_module_channel_map(self, module: QrmRf, qubits: dict): """Retrieve all the channels connected to a specific Qblox module. This method updates the `channel_port_map` attribute of the specified Qblox module based on the information contained in the provided qubits dictionary (dict of `qubit` objects). Return the list of channels connected to module_name """ for qubit in qubits.values(): for channel in qubit.channels: if channel.port and channel.port.module.name == module.name: module.channel_map[channel.name] = channel return list(module.channel_map) def _execute_pulse_sequence( self, qubits: dict, sequence: PulseSequence, options: ExecutionParameters, sweepers: list() = [], # list(Sweeper) = [] **kwargs, # nshots=None, # navgs=None, # relaxation_time=None, ): """Executes a sequence of pulses or a sweep. Args: sequence (:class:`qibolab.pulses.PulseSequence`): The sequence of pulses to execute. options (:class:`qibolab.platforms.platform.ExecutionParameters`): Object holding the execution options. sweepers (list(Sweeper)): A list of Sweeper objects defining parameter sweeps. """ if not self.is_connected: raise_error( RuntimeError, "Execution failed because modules are not connected." ) if options.averaging_mode is AveragingMode.SINGLESHOT: nshots = options.nshots navgs = 1 else: navgs = options.nshots nshots = 1 relaxation_time = options.relaxation_time repetition_duration = sequence.finish + relaxation_time # shots results are stored in separate bins # calculate number of shots num_bins = nshots for sweeper in sweepers: num_bins *= len(sweeper.values) # DEBUG: Plot Pulse Sequence # sequence.plot('plot.png') # DEBUG: sync_en # from qblox_instruments.qcodes_drivers.cluster import Cluster # cluster:Cluster = self.modules['cluster'].device # for module in cluster.modules: # if module.get("present"): # for sequencer in module.sequencers: # if sequencer.get('sync_en'): # print(f"type: {module.module_type}, sequencer: {sequencer.name}, sync_en: True") # Process Pulse Sequence. Assign pulses to modules and generate waveforms & program module_pulses = {} data = {} for name, module in self.modules.items(): # from the pulse sequence, select those pulses to be synthesised by the module module_channels = self._set_module_channel_map(module, qubits) module_pulses[name] = sequence.get_channel_pulses(*module_channels) # ask each module to generate waveforms & program and upload them to the device module.process_pulse_sequence( qubits, module_pulses[name], navgs, nshots, repetition_duration, sweepers, ) # log.info(f"{self.modules[name]}: Uploading pulse sequence") module.upload() # play the sequence or sweep for module in self.modules.values(): if isinstance(module, (QrmRf, QcmRf, QcmBb)): module.play_sequence() # retrieve the results acquisition_results = {} for name, module in self.modules.items(): if isinstance(module, QrmRf) and not module_pulses[name].ro_pulses.is_empty: results = module.acquire() for key, value in results.items(): acquisition_results[key] = value # TODO: move to QRM_RF.acquire() shape = tuple(len(sweeper.values) for sweeper in reversed(sweepers)) shots_shape = (nshots,) + shape for ro_pulse in sequence.ro_pulses: if options.acquisition_type is AcquisitionType.DISCRIMINATION: _res = acquisition_results[ro_pulse.serial].classified _res = np.reshape(_res, shots_shape) if options.averaging_mode is not AveragingMode.SINGLESHOT: _res = np.mean(_res, axis=0) elif options.acquisition_type is AcquisitionType.RAW: i_raw = acquisition_results[ro_pulse.serial].raw_i q_raw = acquisition_results[ro_pulse.serial].raw_q _res = i_raw + 1j * q_raw elif options.acquisition_type is AcquisitionType.INTEGRATION: ires = acquisition_results[ro_pulse.serial].shots_i qres = acquisition_results[ro_pulse.serial].shots_q _res = ires + 1j * qres if options.averaging_mode is AveragingMode.SINGLESHOT: _res = np.reshape(_res, shots_shape) else: _res = np.reshape(_res, shape) acquisition = options.results_type(np.squeeze(_res)) data[ro_pulse.serial] = data[ro_pulse.qubit] = acquisition return data
[docs] def play(self, qubits, couplers, sequence, options): return self._execute_pulse_sequence(qubits, sequence, options)
[docs] def sweep( self, qubits: dict, couplers: dict, sequence: PulseSequence, options: ExecutionParameters, *sweepers, ): """Executes a sequence of pulses while sweeping one or more parameters. The parameters to be swept are defined in :class:`qibolab.sweeper.Sweeper` object. Args: sequence (:class:`qibolab.pulses.PulseSequence`): The sequence of pulses to execute. options (:class:`qibolab.platforms.platform.ExecutionParameters`): Object holding the execution options. sweepers (list(Sweeper)): A list of Sweeper objects defining parameter sweeps. """ id_results = {} map_id_serial = {} # during the sweep, pulse parameters need to be changed # to avoid affecting the user, make a copy of the pulse sequence # and the sweepers, as they contain references to pulses sequence_copy = sequence.copy() sweepers_copy = [] for sweeper in sweepers: if sweeper.pulses: ps = [ sequence_copy[sequence_copy.index(pulse)] for pulse in sweeper.pulses if pulse in sequence_copy ] else: ps = None sweepers_copy.append( Sweeper( parameter=sweeper.parameter, values=sweeper.values, pulses=ps, qubits=sweeper.qubits, type=sweeper.type, ) ) # reverse sweepers exept for res punchout att contains_attenuation_frequency = any( sweepers_copy[i].parameter == Parameter.attenuation and sweepers_copy[i + 1].parameter == Parameter.frequency for i in range(len(sweepers_copy) - 1) ) if not contains_attenuation_frequency: sweepers_copy.reverse() # create a map between the pulse id, which never changes, and the original serial for pulse in sequence_copy.ro_pulses: map_id_serial[pulse.id] = pulse.serial id_results[pulse.id] = None id_results[pulse.qubit] = None # execute the each sweeper recursively self._sweep_recursion( qubits, sequence_copy, options, *tuple(sweepers_copy), results=id_results, ) # return the results using the original serials serial_results = {} for pulse in sequence_copy.ro_pulses: serial_results[map_id_serial[pulse.id]] = id_results[pulse.id] serial_results[pulse.qubit] = id_results[pulse.id] return serial_results
def _sweep_recursion( self, qubits, sequence, options: ExecutionParameters, *sweepers, results, ): """Executes a sweep recursively. Args: sequence (:class:`qibolab.pulses.PulseSequence`): The sequence of pulses to execute. sweepers (list(Sweeper)): A list of Sweeper objects defining parameter sweeps. results (:class:`qibolab.results.ExecutionResults`): A results object to update with the reults of the execution. nshots (int): The number of times the sequence of pulses should be executed. average (bool): A flag to indicate if the results of the shots should be averaged. relaxation_time (int): The the time to wait between repetitions to allow the qubit relax to ground state. """ for_loop_sweepers = [Parameter.attenuation, Parameter.lo_frequency] sweeper: Sweeper = sweepers[0] # until sweeper contains the information to determine whether the sweep should be relative or # absolute: # elif sweeper.parameter is Parameter.relative_phase: # initial = {} # for pulse in sweeper.pulses: # initial[pulse.id] = pulse.relative_phase # elif sweeper.parameter is Parameter.frequency: # initial = {} # for pulse in sweeper.pulses: # initial[pulse.id] = pulse.frequency if sweeper.parameter in for_loop_sweepers: # perform sweep recursively for value in sweeper.values: if sweeper.parameter is Parameter.attenuation: initial = {} for qubit in sweeper.qubits: initial[qubit.name] = qubits[qubit.name].readout.attenuation if sweeper.type == SweeperType.ABSOLUTE: qubit.readout.attenuation = value elif sweeper.type == SweeperType.OFFSET: qubit.readout.attenuation = initial[qubit.name] + value elif sweeper.type == SweeperType.FACTOR: qubit.readout.attenuation = initial[qubit.name] * value elif sweeper.parameter is Parameter.lo_frequency: initial = {} for pulse in sweeper.pulses: if pulse.type == PulseType.READOUT: initial[pulse.id] = qubits[pulse.qubit].readout.lo_frequency if sweeper.type == SweeperType.ABSOLUTE: qubits[pulse.qubit].readout.lo_frequency = value elif sweeper.type == SweeperType.OFFSET: qubits[pulse.qubit].readout.lo_frequency = ( initial[pulse.id] + value ) elif sweeper.type == SweeperType.FACTOR: qubits[pulse.qubit].readout.lo_frequency = ( initial[pulse.id] * value ) elif pulse.type == PulseType.DRIVE: initial[pulse.id] = qubits[pulse.qubit].drive.lo_frequency if sweeper.type == SweeperType.ABSOLUTE: qubits[pulse.qubit].drive.lo_frequency = value elif sweeper.type == SweeperType.OFFSET: qubits[pulse.qubit].drive.lo_frequency = ( initial[pulse.id] + value ) elif sweeper.type == SweeperType.FACTOR: qubits[pulse.qubit].drive.lo_frequency = ( initial[pulse.id] * value ) if len(sweepers) > 1: self._sweep_recursion( qubits, sequence, options, *sweepers[1:], results=results, ) else: result = self._execute_pulse_sequence( qubits=qubits, sequence=sequence, options=options ) for pulse in sequence.ro_pulses: if results[pulse.id]: results[pulse.id] += result[pulse.serial] else: results[pulse.id] = result[pulse.serial] results[pulse.qubit] = results[pulse.id] else: # rt sweeps # relative phase sweeps that cross 0 need to be split in two separate sweeps split_relative_phase = False rt_sweepers = [ Parameter.frequency, Parameter.gain, Parameter.bias, Parameter.amplitude, Parameter.start, Parameter.duration, Parameter.relative_phase, ] if sweeper.parameter == Parameter.relative_phase: if sweeper.type != SweeperType.ABSOLUTE: raise_error( ValueError, "relative_phase sweeps other than ABSOLUTE are not supported by qblox yet", ) from qibolab.instruments.qblox.q1asm import convert_phase c_values = np.array([convert_phase(v) for v in sweeper.values]) if any(np.diff(c_values) < 0): split_relative_phase = True _from = 0 for idx in np.append( np.where(np.diff(c_values) < 0), len(c_values) - 1 ): _to = idx + 1 _values = sweeper.values[_from:_to] split_sweeper = Sweeper( parameter=sweeper.parameter, values=_values, pulses=sweeper.pulses, qubits=sweeper.qubits, ) self._sweep_recursion( qubits, sequence, options, *((split_sweeper,) + sweepers[1:]), results=results, ) _from = _to if not split_relative_phase: if any(s.parameter not in rt_sweepers for s in sweepers): # TODO: reorder the sequence of the sweepers and the results raise Exception( "cannot execute a for-loop sweeper nested inside of a rt sweeper" ) nshots = ( options.nshots if options.averaging_mode == AveragingMode.SINGLESHOT else 1 ) navgs = ( options.nshots if options.averaging_mode != AveragingMode.SINGLESHOT else 1 ) num_bins = nshots for sweeper in sweepers: num_bins *= len(sweeper.values) # split the sweep if the number of bins is larget than the memory of the sequencer (2**17) if num_bins < SEQUENCER_MEMORY: # for sweeper in sweepers: # if sweeper.parameter is Parameter.amplitude: # # qblox cannot sweep amplitude in real time, but sweeping gain is quivalent # for pulse in sweeper.pulses: # pulse.amplitude = 1 # elif sweeper.parameter is Parameter.gain: # for pulse in sweeper.pulses: # # qblox has an external and an internal gains # # when sweeping the internal, set the external to 1 # # TODO check if it needs to be restored after execution # if pulse.type == PulseType.READOUT: # qubits[pulse.qubit].readout.gain = 1 # elif pulse.type == PulseType.DRIVE: # qubits[pulse.qubit].drive.gain = 1 result = self._execute_pulse_sequence( qubits, sequence, options, sweepers ) self._add_to_results(sequence, results, result) else: sweepers_repetitions = 1 for sweeper in sweepers: sweepers_repetitions *= len(sweeper.values) if sweepers_repetitions > SEQUENCER_MEMORY: raise ValueError( f"Requested sweep has {sweepers_repetitions} total number of sweep points. " f"Maximum supported is {SEQUENCER_MEMORY}" ) max_rt_nshots = SEQUENCER_MEMORY // sweepers_repetitions num_full_sft_iterations = nshots // max_rt_nshots result_chunks = [] for sft_iteration in range(num_full_sft_iterations + 1): _nshots = min( max_rt_nshots, nshots - sft_iteration * max_rt_nshots ) res = self._execute_pulse_sequence( qubits, sequence, replace(options, nshots=_nshots), sweepers, ) result_chunks.append(res) result = self._combine_result_chunks(result_chunks) self._add_to_results(sequence, results, result) @staticmethod def _combine_result_chunks(chunks): some_chunk = next(iter(chunks)) some_result = next(iter(some_chunk.values())) attribute = "samples" if isinstance(some_result, SampleResults) else "voltage" return { key: some_result.__class__( np.concatenate( [getattr(chunk[key], attribute) for chunk in chunks], axis=0 ) ) for key in some_chunk.keys() } @staticmethod def _add_to_results(sequence, results, results_to_add): for pulse in sequence.ro_pulses: if results[pulse.id]: results[pulse.id] += results_to_add[pulse.serial] else: results[pulse.id] = results_to_add[pulse.serial] results[pulse.qubit] = results[pulse.id]