"""Emulator controller."""
from collections import defaultdict
from collections.abc import Iterable
from functools import reduce
from operator import or_
from typing import Callable, Optional, cast
import numpy as np
from numpy.typing import NDArray
from qibolab._core.components import Config
from qibolab._core.components.configs import AcquisitionConfig
from qibolab._core.execution_parameters import ExecutionParameters
from qibolab._core.identifier import Result
from qibolab._core.instruments.abstract import Controller
from qibolab._core.pulses import (
Acquisition,
Delay,
Pulse,
PulseLike,
Readout,
VirtualZ,
)
from qibolab._core.sequence import PulseSequence
from qibolab._core.sweeper import ParallelSweepers
from .engine import Operator, OperatorEvolution, QutipEngine, SimulationEngine
from .hamiltonians import (
HamiltonianConfig,
Modulated,
waveform,
)
from .results import acquisitions, index, results, select_acquisitions
__all__ = ["EmulatorController"]
[docs]
class EmulatorController(Controller):
"""Emulator controller."""
engine: SimulationEngine = QutipEngine()
"""SimulationEngine. Default is QutipEngine."""
bounds: str = "emulator/bounds"
[docs]
def connect(self):
"""Dummy connect method."""
[docs]
def disconnect(self):
"""Dummy disconnect method."""
@property
def sampling_rate(self):
"""Sampling rate of emulator."""
return 1
[docs]
def play(
self,
configs: dict[str, Config],
sequences: list[PulseSequence],
options: ExecutionParameters,
sweepers: list[ParallelSweepers],
) -> dict[int, Result]:
# convert align to delays
sequences_ = (seq.align_to_delays() for seq in sequences)
# just merge the results of multiple executions in a single dictionary
return reduce(
or_,
(
results(
# states in computational basis
self._sweep(sequence, configs, sweepers),
sequence,
cast(HamiltonianConfig, configs["hamiltonian"]),
options,
)
for sequence in sequences_
),
)
def _sweep(
self,
sequence: PulseSequence,
configs: dict[str, Config],
sweepers: list[ParallelSweepers],
updates: Optional[dict] = None,
) -> NDArray:
"""Sweep over sequence.
This function invokes itself recursively, adding an array
dimension at each call as the outermost one. The extra dimension
corresponds to the values in the first nested sweep (with the
lowest index, interpreted as the outermost as well).
"""
# use a default dictionary, merging existing values
updates = defaultdict(dict) | ({} if updates is None else updates)
if len(sweepers) == 0:
return self._play_sequence(sequence, configs, updates)
parsweep = sweepers[0]
# collect slices of results, corresponding to the current iteration
results = []
# execute once for each parallel value
for values in zip(*(s.values for s in parsweep)):
# update all parallel sweepers with the respective values
for sweeper, value in zip(parsweep, values):
if sweeper.pulses is not None:
for pulse in sweeper.pulses:
updates[pulse.id].update({sweeper.parameter.name: value})
if sweeper.channels is not None:
for channel in sweeper.channels:
updates[channel].update({sweeper.parameter.name: value})
# append new slice for the current parallel value
results.append(self._sweep(sequence, configs, sweepers[1:], updates))
# stack all slices in a single array, along the current outermost dimension
return np.stack(results)
def _play_sequence(
self, sequence: PulseSequence, configs: dict[str, Config], updates: dict
) -> NDArray:
"""Play single sequence on emulator.
The array returned by this function has a single dimension, over
the various measurements included in the sequence.
"""
sequence_ = update_sequence(sequence, updates)
tlist_ = tlist(sequence_, self.sampling_rate)
configs_ = update_configs(configs, updates)
config = cast(HamiltonianConfig, configs_["hamiltonian"])
hamiltonian = config.hamiltonian(config=configs_, engine=self.engine)
time_hamiltonian = self._pulse_hamiltonian(sequence_, configs_)
results = self.engine.evolve(
hamiltonian=hamiltonian,
initial_state=config.initial_state(self.engine),
time=tlist_,
collapse_operators=config.dissipation(self.engine),
time_hamiltonian=time_hamiltonian,
)
return select_acquisitions(
results.states,
acquisitions(sequence_).values(),
tlist_,
)
def _pulse_hamiltonian(
self, sequence: PulseSequence, configs: dict[str, Config]
) -> Optional[OperatorEvolution]:
"""Construct Hamiltonian time dependent term for qutip simulation."""
channels = [
[operator, channel_time(waveforms)]
for operator, waveforms in hamiltonians(sequence, configs, self.engine)
]
return OperatorEvolution(channels) if len(channels) > 0 else None
def update_sequence(sequence: PulseSequence, updates: dict) -> PulseSequence:
"""Apply sweep updates to base sequence."""
return PulseSequence(
[(ch, e.model_copy(update=updates.get(e.id, {}))) for ch, e in sequence]
)
def update_configs(configs: dict[str, Config], updates: dict) -> dict[str, Config]:
"""Apply sweep updates to base configs."""
return {k: c.model_copy(update=updates.get(k, {})) for k, c in configs.items()}
def tlist(
sequence: PulseSequence, sampling_rate: float, per_sample: float = 2
) -> NDArray:
"""Compute times for evolution.
The frequency of times is double the sampling rate, to make sure
that all pulses features are resolved by the evolution.
This can be customized using the `per_sample` rate, e.g. to retrieve times at the
sampling rate itself, for pulses evaluation.
.. note::
As an optimization, if an acquisition is executed as the last
sequence operation, that's not taken into account, since it is not
simulated by the present emulator.
For long experiments, it is a mild optimization. But it critically speeds up
short experiments, given the usual relative duration of acquisitions and control
pulses.
"""
seq = (
sequence[:-1]
if isinstance(sequence[-1][1], (Acquisition, Readout))
else sequence
)
end = max(seq.duration, 1)
rate = sampling_rate * per_sample
return np.arange(0, end, 1 / rate)
def hamiltonian(
pulses: Iterable[PulseLike],
config: Config,
hamiltonian: HamiltonianConfig,
hilbert_space_index: int,
engine: SimulationEngine,
) -> tuple[Operator, list[Modulated]]:
n = hamiltonian.transmon_levels
op = engine.expand(
config.operator(n=n, engine=engine), hamiltonian.dims, hilbert_space_index
)
waveforms = (
waveform(pulse, config, hamiltonian.qubits[hilbert_space_index])
for pulse in pulses
if isinstance(pulse, (Pulse, Delay, VirtualZ))
)
return (op, [w for w in waveforms if w is not None])
def hamiltonians(
sequence: PulseSequence, configs: dict[str, Config], engine: SimulationEngine
) -> Iterable[tuple[Operator, list[Modulated]]]:
hconfig = cast(HamiltonianConfig, configs["hamiltonian"])
return (
hamiltonian(
sequence.channel(ch),
configs[ch],
hconfig,
index(ch, hconfig),
engine,
)
for ch in sequence.channels
# TODO: drop the following, and treat acquisitions just as empty channels
if not isinstance(configs[ch], AcquisitionConfig)
)
def channel_time(waveforms: Iterable[Modulated]) -> Callable[[float], float]:
"""Wrap time function for specific channel.
Used to avoid late binding issues.
"""
def time(t: float) -> float:
cumulative_time = 0
cumulative_phase = 0
for pulse in waveforms:
pulse_duration = pulse.duration # TODO: pass sampling rate
pulse_phase = pulse.phase
if cumulative_time <= t < cumulative_time + pulse_duration:
relative_time = t - cumulative_time
index = int(relative_time) # TODO: pass sampling rate
return pulse(t, index, cumulative_phase)
cumulative_time += pulse_duration
cumulative_phase += pulse_phase
return 0
return time