"""RFSoC FPGA driver."""
import re
from dataclasses import asdict, dataclass
from typing import Union
import numpy as np
import numpy.typing as npt
import qibosoq.components.base as rfsoc
from qibo.config import log
from qibosoq import client
from qibolab import AcquisitionType, AveragingMode, ExecutionParameters
from qibolab.couplers import Coupler
from qibolab.instruments.abstract import Controller
from qibolab.instruments.port import Port
from qibolab.pulses import PulseSequence, PulseType
from qibolab.qubits import Qubit
from qibolab.result import AveragedSampleResults, IntegratedResults, SampleResults
from qibolab.sweeper import BIAS, Sweeper
from .convert import convert, convert_units_sweeper
HZ_TO_MHZ = 1e-6
NS_TO_US = 1e-3
[docs]@dataclass
class RFSoCPort(Port):
"""Port object of the RFSoC."""
name: int
"""DAC number."""
offset: float = 0.0
"""Amplitude factor for biasing."""
[docs]class RFSoC(Controller):
"""Instrument object for controlling RFSoC FPGAs.
The two way of executing pulses are with ``play`` (for arbitrary
qibolab ``PulseSequence``) or with ``sweep`` that execute a
``PulseSequence`` object with one or more ``Sweeper``.
Attributes:
cfg (rfsoc.Config): Configuration dictionary required for pulse execution.
"""
PortType = RFSoCPort
def __init__(self, name: str, address: str, port: int, sampling_rate: float = 1.0):
"""Set server information and base configuration.
Args:
name (str): Name of the instrument instance.
address (str): IP and port of the server (ex. 192.168.0.10)
port (int): Port of the server (ex.6000)
"""
super().__init__(name, address=address)
self.host = address
self.port = port
self.cfg = rfsoc.Config()
self._sampling_rate = sampling_rate
@property
def sampling_rate(self):
return self._sampling_rate
[docs] def connect(self):
"""Empty method to comply with Instrument interface."""
[docs] def disconnect(self):
"""Empty method to comply with Instrument interface."""
@staticmethod
def _try_to_execute(server_commands, host, port):
try:
return client.connect(server_commands, host, port)
except RuntimeError as e:
if "exception in readout loop" in str(e):
log.warning(
"%s %s",
"Exception in readout loop. Attempting again",
"You may want to increase the relaxation time.",
)
return client.connect(server_commands, host, port)
buffer_overflow = r"buffer length must be \d+ samples or less"
if re.search(buffer_overflow, str(e)) is not None:
log.warning("Buffer full! Use shorter pulses.")
raise e
[docs] @staticmethod
def convert_and_discriminate_samples(discriminated_shots, execution_parameters):
if execution_parameters.averaging_mode is AveragingMode.CYCLIC:
_, counts = np.unique(discriminated_shots, return_counts=True, axis=0)
freqs = counts / discriminated_shots.shape[0]
result = execution_parameters.results_type(freqs, discriminated_shots)
else:
result = execution_parameters.results_type(discriminated_shots)
return result
[docs] @staticmethod
def merge_sweep_results(
dict_a: dict[str, Union[IntegratedResults, SampleResults]],
dict_b: dict[str, Union[IntegratedResults, SampleResults]],
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Merge two dictionary mapping pulse serial to Results object.
If dict_b has a key (serial) that dict_a does not have, simply add it,
otherwise sum the two results
Args:
dict_a (dict): dict mapping ro pulses serial to qibolab res objects
dict_b (dict): dict mapping ro pulses serial to qibolab res objects
Returns:
A dict mapping the readout pulses serial to qibolab results objects
"""
for serial in dict_b:
if serial in dict_a:
data = lambda res: (
res.voltage if isinstance(res, IntegratedResults) else res.samples
)
dict_a[serial] = type(dict_a[serial])(
np.append(data(dict_a[serial]), data(dict_b[serial]))
)
else:
dict_a[serial] = dict_b[serial]
return dict_a
[docs] @staticmethod
def reshape_sweep_results(results, sweepers, execution_parameters):
shape = [len(sweeper.values) for sweeper in sweepers]
if execution_parameters.averaging_mode is not AveragingMode.CYCLIC:
shape.insert(0, execution_parameters.nshots)
def data(value):
if isinstance(value, IntegratedResults):
data = value.voltage
elif isinstance(value, AveragedSampleResults):
data = value.statistical_frequency
else:
data = value.samples
return type(value)(data.reshape(shape))
return {key: data(value) for key, value in results.items()}
def _execute_pulse_sequence(
self,
sequence: PulseSequence,
qubits: dict[int, Qubit],
opcode: rfsoc.OperationCode,
) -> tuple[list, list]:
"""Prepare the commands dictionary to send to the qibosoq server.
Args:
sequence (`qibolab.pulses.PulseSequence`): arbitrary PulseSequence object to execute
qubits: list of qubits (`qibolab.platforms.abstract.Qubit`) of the platform in the form of a dictionary
opcode: can be `rfsoc.OperationCode.EXECUTE_PULSE_SEQUENCE` or `rfsoc.OperationCode.EXECUTE_PULSE_SEQUENCE_RAW`
Returns:
Lists of I and Q value measured
"""
server_commands = {
"operation_code": opcode,
"cfg": asdict(self.cfg),
"sequence": convert(sequence, qubits, self.sampling_rate),
"qubits": [asdict(convert(qubits[idx])) for idx in qubits],
}
return self._try_to_execute(server_commands, self.host, self.port)
def _execute_sweeps(
self,
sequence: PulseSequence,
qubits: dict[int, Qubit],
sweepers: list[rfsoc.Sweeper],
) -> tuple[list, list]:
"""Prepare the commands dictionary to send to the qibosoq server.
Args:
sequence (`qibolab.pulses.PulseSequence`): arbitrary PulseSequence object to execute
qubits: list of qubits (`qibolab.platforms.abstract.Qubit`) of the platform in the form of a dictionary
sweepers: list of `qibosoq.abstract.Sweeper` objects
Returns:
Lists of I and Q value measured
"""
converted_sweepers = [
convert_units_sweeper(sweeper, sequence, qubits) for sweeper in sweepers
]
server_commands = {
"operation_code": rfsoc.OperationCode.EXECUTE_SWEEPS,
"cfg": asdict(self.cfg),
"sequence": convert(sequence, qubits, self.sampling_rate),
"qubits": [asdict(convert(qubits[idx])) for idx in qubits],
"sweepers": [sweeper.serialized for sweeper in converted_sweepers],
}
return self._try_to_execute(server_commands, self.host, self.port)
[docs] def play(
self,
qubits: dict[int, Qubit],
couplers: dict[int, Coupler],
sequence: PulseSequence,
execution_parameters: ExecutionParameters,
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Execute the sequence of instructions and retrieves readout results.
Each readout pulse generates a separate acquisition.
The relaxation_time and the number of shots have default values.
Args:
qubits (dict): List of `qibolab.platforms.utils.Qubit` objects
passed from the platform.
execution_parameters (`qibolab.ExecutionParameters`): Parameters (nshots,
relaxation_time,
fast_reset,
acquisition_type,
averaging_mode)
sequence (`qibolab.pulses.PulseSequence`): Pulse sequence to play.
Returns:
A dictionary mapping the readout pulses serial and respective qubits to
qibolab results objects
"""
if couplers != {}:
raise NotImplementedError(
"The RFSoC driver currently does not support couplers."
)
self.validate_input_command(sequence, execution_parameters, sweep=False)
self.update_cfg(execution_parameters)
if execution_parameters.acquisition_type is AcquisitionType.DISCRIMINATION:
self.cfg.average = False
else:
self.cfg.average = (
execution_parameters.averaging_mode is AveragingMode.CYCLIC
)
if execution_parameters.acquisition_type is AcquisitionType.RAW:
opcode = rfsoc.OperationCode.EXECUTE_PULSE_SEQUENCE_RAW
else:
opcode = rfsoc.OperationCode.EXECUTE_PULSE_SEQUENCE
toti, totq = self._execute_pulse_sequence(sequence, qubits, opcode)
results = {}
probed_qubits = np.unique([p.qubit for p in sequence.ro_pulses])
for j, qubit in enumerate(probed_qubits):
for i, ro_pulse in enumerate(sequence.ro_pulses.get_qubit_pulses(qubit)):
i_pulse = np.array(toti[j][i])
q_pulse = np.array(totq[j][i])
if (
execution_parameters.acquisition_type
is AcquisitionType.DISCRIMINATION
):
discriminated_shots = self.classify_shots(
i_pulse, q_pulse, qubits[ro_pulse.qubit]
)
result = self.convert_and_discriminate_samples(
discriminated_shots, execution_parameters
)
else:
result = execution_parameters.results_type(i_pulse + 1j * q_pulse)
results[ro_pulse.qubit] = results[ro_pulse.serial] = result
return results
[docs] def update_cfg(self, execution_parameters: ExecutionParameters):
"""Update rfsoc.Config object with new parameters."""
if execution_parameters.nshots is not None:
self.cfg.reps = execution_parameters.nshots
if execution_parameters.relaxation_time is not None:
self.cfg.relaxation_time = execution_parameters.relaxation_time * NS_TO_US
[docs] def classify_shots(
self,
i_values: npt.NDArray[np.float64],
q_values: npt.NDArray[np.float64],
qubit: Qubit,
) -> npt.NDArray[np.float64]:
"""Classify IQ values using qubit threshold and rotation_angle if
available in runcard."""
if qubit.iq_angle is None or qubit.threshold is None:
raise ValueError("Classification parameters were not provided")
angle = qubit.iq_angle
threshold = qubit.threshold
rotated = np.cos(angle) * np.array(i_values) - np.sin(angle) * np.array(
q_values
)
shots = np.heaviside(np.array(rotated) - threshold, 0)
if isinstance(shots, float):
return np.array([shots])
return shots
[docs] def play_sequence_in_sweep_recursion(
self,
qubits: dict[int, Qubit],
couplers: dict[int, Coupler],
sequence: PulseSequence,
or_sequence: PulseSequence,
execution_parameters: ExecutionParameters,
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Last recursion layer, if no sweeps are present.
After playing the sequence, the resulting dictionary keys need
to be converted to the correct values. Even indexes correspond
to qubit number and are not changed. Odd indexes correspond to
readout pulses serials and are convert to match the original
sequence (of the sweep) and not the one just executed.
"""
res = self.play(qubits, couplers, sequence, execution_parameters)
newres = {}
serials = [pulse.serial for pulse in or_sequence.ro_pulses]
for idx, key in enumerate(res):
if idx % 2 == 1:
newres[serials[idx // 2]] = res[key]
else:
newres[key] = res[key]
return newres
[docs] def recursive_python_sweep(
self,
qubits: dict[int, Qubit],
couplers: dict[int, Coupler],
sequence: PulseSequence,
or_sequence: PulseSequence,
*sweepers: rfsoc.Sweeper,
execution_parameters: ExecutionParameters,
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Execute a sweep of an arbitrary number of Sweepers via recursion.
Args:
qubits (list): List of `qibolab.platforms.utils.Qubit` objects
passed from the platform.
sequence (`qibolab.pulses.PulseSequence`): Pulse sequence to play.
This object is a deep copy of the original
sequence and gets modified.
or_sequence (`qibolab.pulses.PulseSequence`): Reference to original
sequence to not modify.
*sweepers (`qibolab.Sweeper`): Sweeper objects.
execution_parameters (`qibolab.ExecutionParameters`): Parameters (nshots,
relaxation_time,
fast_reset,
acquisition_type,
averaging_mode)
Returns:
A dictionary mapping the readout pulses serial and respective qubits to
results objects
"""
# If there are no sweepers run ExecutePulseSequence acquisition.
# Last layer for recursion.
if len(sweepers) == 0:
return self.play_sequence_in_sweep_recursion(
qubits, couplers, sequence, or_sequence, execution_parameters
)
if not self.get_if_python_sweep(sequence, *sweepers):
toti, totq = self._execute_sweeps(sequence, qubits, sweepers)
res = self.convert_sweep_results(
or_sequence, qubits, toti, totq, execution_parameters
)
return res
sweeper = sweepers[0]
values = []
for idx, _ in enumerate(sweeper.indexes):
val = np.linspace(sweeper.starts[idx], sweeper.stops[idx], sweeper.expts)
if sweeper.parameters[idx] in rfsoc.Parameter.variants(
{"duration", "delay"}
):
val = val.astype(int)
values.append(val)
results: dict[str, Union[IntegratedResults, SampleResults]] = {}
for idx in range(sweeper.expts):
# update values
for jdx, kdx in enumerate(sweeper.indexes):
sweeper_parameter = sweeper.parameters[jdx]
if sweeper_parameter is rfsoc.Parameter.BIAS:
qubits[list(qubits)[kdx]].flux.offset = values[jdx][idx]
elif sweeper_parameter in rfsoc.Parameter.variants(
{
"amplitude",
"frequency",
"relative_phase",
"duration",
}
):
setattr(
sequence[kdx], sweeper_parameter.name.lower(), values[jdx][idx]
)
if sweeper_parameter is rfsoc.Parameter.DURATION:
for pulse_idx in range(
kdx + 1,
len(sequence.get_qubit_pulses(sequence[kdx].qubit)),
):
# TODO: this is a patch and works just for simple experiments
sequence[pulse_idx].start = sequence[pulse_idx - 1].finish
elif sweeper_parameter is rfsoc.Parameter.DELAY:
sequence[kdx].start_delay = values[jdx][idx]
res = self.recursive_python_sweep(
qubits,
couplers,
sequence,
or_sequence,
*sweepers[1:],
execution_parameters=execution_parameters,
)
results = self.merge_sweep_results(results, res)
return results
[docs] def get_if_python_sweep(
self, sequence: PulseSequence, *sweepers: rfsoc.Sweeper
) -> bool:
"""Check if a sweeper must be run with python loop or on hardware.
To be run on qick internal loop a sweep must:
* not be on the readout frequency
* not be a duration sweeper
* only one pulse per channel supported
* flux pulses are not compatible with sweepers
Args:
sequence (`qibolab.pulses.PulseSequence`). Pulse sequence to play.
*sweepers (`qibosoq.abstract.Sweeper`): Sweeper objects.
Returns:
A boolean value true if the sweeper must be executed by python
loop, false otherwise
"""
if any(pulse.type is PulseType.FLUX for pulse in sequence):
return True
for sweeper in sweepers:
if all(
parameter is rfsoc.Parameter.BIAS for parameter in sweeper.parameters
):
continue
if all(
parameter is rfsoc.Parameter.DELAY for parameter in sweeper.parameters
):
continue
if any(
parameter is rfsoc.Parameter.DURATION
for parameter in sweeper.parameters
):
return True
for sweep_idx, parameter in enumerate(sweeper.parameters):
is_freq = parameter is rfsoc.Parameter.FREQUENCY
is_ro = sequence[sweeper.indexes[sweep_idx]].type == PulseType.READOUT
# if it's a sweep on the readout freq do a python sweep
if is_freq and is_ro:
return True
for idx in sweeper.indexes:
sweep_pulse = sequence[idx]
channel = sweep_pulse.channel
ch_pulses = sequence.get_channel_pulses(channel)
if len(ch_pulses) > 1:
return True
# if all passed, do a firmware sweep
return False
[docs] def convert_sweep_results(
self,
original_ro: PulseSequence,
qubits: dict[int, Qubit],
toti: list[list[list[float]]],
totq: list[list[list[float]]],
execution_parameters: ExecutionParameters,
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Convert sweep res to qibolab dict res.
Args:
original_ro (`qibolab.pulses.PulseSequence`): Original PulseSequence
qubits (list): List of `qibolab.platforms.utils.Qubit` objects
passed from the platform.
toti (list): i values
totq (list): q values
results_type: qibolab results object
execution_parameters (`qibolab.ExecutionParameters`): Parameters (nshots,
relaxation_time,
fast_reset,
acquisition_type,
averaging_mode)
Returns:
A dict mapping the readout pulses serial to qibolab results objects
"""
results = {}
adcs = np.unique([qubits[p.qubit].feedback.port.name for p in original_ro])
for k, k_val in enumerate(adcs):
adc_ro = [
pulse
for pulse in original_ro
if qubits[pulse.qubit].feedback.port.name == k_val
]
for i, ro_pulse in enumerate(adc_ro):
i_vals = np.array(toti[k][i])
q_vals = np.array(totq[k][i])
if not self.cfg.average:
i_vals = np.reshape(i_vals, (self.cfg.reps, *i_vals.shape[:-1]))
q_vals = np.reshape(q_vals, (self.cfg.reps, *q_vals.shape[:-1]))
if (
execution_parameters.acquisition_type
is AcquisitionType.DISCRIMINATION
):
qubit = qubits[ro_pulse.qubit]
discriminated_shots = self.classify_shots(i_vals, q_vals, qubit)
result = self.convert_and_discriminate_samples(
discriminated_shots, execution_parameters
)
else:
result = execution_parameters.results_type(i_vals + 1j * q_vals)
results[ro_pulse.qubit] = results[ro_pulse.serial] = result
return results
[docs] def sweep(
self,
qubits: dict[int, Qubit],
couplers: dict[int, Coupler],
sequence: PulseSequence,
execution_parameters: ExecutionParameters,
*sweepers: Sweeper,
) -> dict[str, Union[IntegratedResults, SampleResults]]:
"""Execute the sweep and retrieves the readout results.
Each readout pulse generates a separate acquisition.
The relaxation_time and the number of shots have default values.
Args:
qubits (list): List of `qibolab.platforms.utils.Qubit` objects
passed from the platform.
execution_parameters (`qibolab.ExecutionParameters`): Parameters (nshots,
relaxation_time,
fast_reset,
acquisition_type,
averaging_mode)
sequence (`qibolab.pulses.PulseSequence`). Pulse sequence to play.
*sweepers (`qibolab.Sweeper`): Sweeper objects.
Returns:
A dictionary mapping the readout pulses serial and respective qubits to
results objects
"""
if couplers != {}:
raise NotImplementedError(
"The RFSoC driver currently does not support couplers."
)
self.validate_input_command(sequence, execution_parameters, sweep=True)
self.update_cfg(execution_parameters)
if execution_parameters.acquisition_type is AcquisitionType.DISCRIMINATION:
self.cfg.average = False
else:
self.cfg.average = (
execution_parameters.averaging_mode is AveragingMode.CYCLIC
)
rfsoc_sweepers = [convert(sweep, sequence, qubits) for sweep in sweepers]
sweepsequence = sequence.copy()
bias_change = any(sweep.parameter is BIAS for sweep in sweepers)
if bias_change:
initial_biases = [
qubits[idx].flux.offset if qubits[idx].flux is not None else None
for idx in qubits
]
results = self.recursive_python_sweep(
qubits,
couplers,
sweepsequence,
sequence.ro_pulses,
*rfsoc_sweepers,
execution_parameters=execution_parameters,
)
if bias_change:
for idx, qubit in enumerate(qubits.values()):
if qubit.flux is not None:
qubit.flux.offset = initial_biases[idx]
return self.reshape_sweep_results(results, sweepers, execution_parameters)