Source code for qibocal.auto.execute

"""Tasks execution."""

import importlib
import importlib.util
import os
import sys
from contextlib import contextmanager
from copy import deepcopy
from dataclasses import dataclass, fields
from pathlib import Path
from typing import Optional, Union

from qibo.backends import construct_backend

from qibocal import protocols
from qibocal.config import log

from ..calibration import CalibrationPlatform, create_calibration_platform
from .history import History
from .mode import AUTOCALIBRATION, ExecutionMode
from .operation import Routine
from .output import Metadata, Output
from .task import Action, Completed, Targets, Task

PLATFORM_DIR = "platform"
"""Folder where platform will be dumped."""


[docs] def _register(name: str, obj): """Register object as module. With a small abuse of the Python module system, the object is registered as a module, with the given `name`. `name` may contain dots, cf. :attr:`Executor.name` for clarifications about their meaning. .. note:: This is mainly used to register executors, such that the protocols can be bound to it through the `import` keyword, in order to construct an intuitive syntax, apparently purely functional, maintaining the context in a single `Executor` "global" object. """ # prevent overwriting existing modules if name in sys.modules: raise ValueError( f"Module '{name}' already present. " "Choose a different one to avoid overwriting it." ) # allow relative paths, where relative is intended respect to package root root = __name__.split(".")[0] qualified = importlib.util.resolve_name(name, root) # allow to nest module in arbitrary subpackage if "." in qualified: parent_name, _, child_name = qualified.rpartition(".") parent_module = importlib.import_module(parent_name) setattr(parent_module, child_name, obj) sys.modules[qualified] = obj obj.name = obj.__name__ = qualified obj.__spec__ = None
[docs] @dataclass class Executor: """Execute a tasks' graph and tracks its history.""" history: History """The execution history, with results and exit states.""" targets: Targets """Qubits/Qubit Pairs to be calibrated.""" platform: CalibrationPlatform """Qubits' platform.""" update: bool = True """Runcard update mechanism.""" name: Optional[str] = None """Symbol for the executor. This can be used generically to distinguish the executor, but its specific use is to register a module with this name in `sys.modules`. They can contain dots, `.`, that are interpreted as usual by the Python module system. .. note:: As a special case, mainly used for internal purposes, names starting with `.` are also allowed, and they are interpreted relative to this package (in the top scope). """ path: Optional[Path] = None meta: Optional[Metadata] = None def __post_init__(self): """Register as a module, if a name is specified.""" if self.name is not None: _register(self.name, self)
[docs] @classmethod def create(cls, name: str, platform: Union[CalibrationPlatform, str, None] = None): """Load list of protocols.""" platform = ( platform if isinstance(platform, CalibrationPlatform) else create_calibration_platform( platform if isinstance(platform, str) else "mock" ) ) return cls( name=name, history=History(), platform=platform, targets=list(platform.qubits), update=True, )
[docs] def run_protocol( self, protocol: Routine, parameters: Action, mode: ExecutionMode = AUTOCALIBRATION, ) -> Completed: """Run single protocol in ExecutionMode mode.""" task = Task(action=parameters, operation=protocol) log.info(f"Executing mode {mode} on {task.action.id}.") completed = task.run(platform=self.platform, targets=self.targets, mode=mode) self.history.push(completed) # TODO: drop, as the conditions won't be necessary any longer, and then it could # be performed as part of `task.run` https://github.com/qiboteam/qibocal/issues/910 if ExecutionMode.FIT in mode: if self.update and task.update: completed.update_platform(platform=self.platform) return completed
def __getattribute__(self, name: str): """Provide access to routines through the executor. This is done mainly to support the import mechanics: the routines retrieved through the object will have it pre-registered. """ modname = super().__getattribute__("name") if modname is None: # no module registration, immediately fall back return super().__getattribute__(name) try: # routines look up if name.startswith("_"): # internal attributes should never be routines raise AttributeError protocol = getattr(protocols, name) return self._wrapped_protocol(protocol, name) except AttributeError: # fall back on regular attributes return super().__getattribute__(name)
[docs] def _wrapped_protocol(self, protocol: Routine, operation: str): """Create a bound protocol. Returns a closure, already wrapping the current `Executor` instance, but specific to the `protocol` chosen. The parameters of this wrapper function maps to protocol's ones, in particular: - the keyword argument `mode` is used as the execution mode (defaults to `AUTOCALIBRATION`) - the keyword argument `id` is used as the `id` for the given operation (defaults to `protocol` identifier, the same used to import and invoke it) then the protocol specific are resolved, with the following priority: - explicit keyword arguments have the highest priorities - items in the dictionary passed with the keyword `parameters` - positional arguments, which are associated to protocols parameters in the same order in which they are defined (and documented) in their respective parameters classes .. attention:: Despite the priority being clear, it is advised to use only one of the former schemes to pass parameters, to avoid confusion due to unexpected overwritten arguments. E.g. for:: resonator_spectroscopy(1e7, 1e5, freq_width=1e8) the `freq_width` will be `1e8`, and `1e7` will be silently overwritten and ignored (as opposed to a regular Python function, where a `TypeError` would be raised). The priority defined above is strictly and silently respected, so just pay attention during invocations. """ def wrapper( *args, parameters: Optional[dict] = None, id: str = operation, mode: ExecutionMode = AUTOCALIBRATION, update: bool = True, targets: Optional[Targets] = None, **kwargs, ): positional = dict( zip((f.name for f in fields(protocol.parameters_type)), args) ) params = deepcopy(parameters) if parameters is not None else {} action = Action.cast( source={ "id": id, "operation": operation, "targets": targets, "update": update, "parameters": params | positional | kwargs, } ) return self.run_protocol(protocol, parameters=action, mode=mode) return wrapper
[docs] def unload(self): """Unlist the executor from available modules.""" if self.name is not None: del sys.modules[self.name]
def __del__(self): """Revert constructions side-effects. .. note:: This is to make sure that the executor is properly unregistered from `sys.modules`. However, it is not reliable to be called directly, since `del executor` is not guaranteed to immediately invoke this method, cf. the note for :meth:`object.__del__`. """ try: self.unload() except KeyError: # it has been explicitly unloaded, no need to do it again pass
[docs] def init( self, path: os.PathLike, force: bool = False, platform: Union[CalibrationPlatform, str, None] = None, update: Optional[bool] = None, targets: Optional[Targets] = None, ): """Initialize execution.""" if platform is None or isinstance(platform, CalibrationPlatform): platform = self.platform elif isinstance(platform, str): platform = self.platform = create_calibration_platform(platform) else: platform = self.platform = CalibrationPlatform.from_platform(platform) assert isinstance(platform, CalibrationPlatform) backend = construct_backend(backend="qibolab", platform=platform) if update is not None: self.update = update if targets is not None: self.targets = targets # generate output folder path = Output.mkdir(Path(path), force) # generate meta meta = Metadata.generate(path.name, backend) output = Output(History(), meta, platform) output.dump(path) # run meta.start() # connect and initialize platform platform.connect() self.path = path self.meta = meta
[docs] def close(self, path: Optional[os.PathLike] = None): """Close execution.""" assert self.meta is not None and self.path is not None path = self.path if path is None else Path(path) # stop and disconnect platform self.platform.disconnect() self.meta.end() # dump history, metadata, and updated platform output = Output(self.history, self.meta, self.platform) output.dump(path) # attempt unloading self.__del__()
[docs] @classmethod @contextmanager def open( cls, name: str, path: os.PathLike, force: bool = False, platform: Union[CalibrationPlatform, str, None] = None, update: Optional[bool] = None, targets: Optional[Targets] = None, ): """Enter the execution context.""" ex = cls.create(name, platform) ex.init(path, force, platform, update, targets) try: yield ex finally: ex.close()
def __enter__(self): """Reenter the execution context. This method its here to reuse an already existing (and initialized) executor, in a new context. It should not be used with new executors. In which case, cf. :meth:`__open__`. """ # connect and initialize platform self.platform.connect() return self def __exit__(self, exc_type, exc_value, traceback): """Exit execution context. This pairs with :meth:`__enter__`. """ self.close() return False