Source code for qibosoq.server

"""Qibosoq server for qibolab-qick integration."""

import json
import logging
import os
import socket
import traceback
from socketserver import BaseRequestHandler, TCPServer
from typing import Dict, List

import numpy as np
from qick import QickSoc

import qibosoq.configuration as cfg
from qibosoq.components.base import Config, OperationCode, Parameter, Qubit, Sweeper
from qibosoq.components.pulses import Element, Measurement, Shape
from qibosoq.programs.pulse_sequence import ExecutePulseSequence
from qibosoq.programs.sweepers import ExecuteSweeps

logger = logging.getLogger(cfg.MAIN_LOGGER_NAME)
qick_logger = logging.getLogger(cfg.PROGRAM_LOGGER_NAME)


[docs]def load_elements(list_sequence: List[Dict]) -> List[Element]: """Convert a list of elements in dict form to a list of Pulse objects.""" obj_sequence = [] for element in list_sequence: if "amplitude" in element: # if element is a pulse cls = Shape[element["shape"].upper()].value converted_pulse = cls(**element) obj_sequence.append(converted_pulse) else: # if element is a measurement obj_sequence.append(Measurement(**element)) return obj_sequence
[docs]def load_sweeps(list_sweepers: List[Dict]) -> List[Sweeper]: """Convert a list of sweepers (in dict form) to a list of Sweeper objects.""" sweepers = [] for sweep in list_sweepers: converted_sweep = Sweeper( expts=sweep["expts"], parameters=[Parameter(par) for par in sweep["parameters"]], starts=np.array(sweep["starts"]), stops=np.array(sweep["stops"]), indexes=sweep["indexes"], ) sweepers.append(converted_sweep) return sweepers
[docs]def execute_program(data: dict, qick_soc: QickSoc) -> dict: """Create and execute qick programs. Returns: (dict): dictionary with two keys (i, q) to lists of values """ opcode = OperationCode(data["operation_code"]) args = [] if opcode is OperationCode.EXECUTE_PULSE_SEQUENCE: programcls = ExecutePulseSequence elif opcode is OperationCode.EXECUTE_PULSE_SEQUENCE_RAW: programcls = ExecutePulseSequence data["cfg"]["soft_avgs"] = data["cfg"]["reps"] data["cfg"]["reps"] = 1 elif opcode is OperationCode.EXECUTE_SWEEPS: programcls = ExecuteSweeps args = load_sweeps(data["sweepers"]) else: raise NotImplementedError( f"Operation code {data['operation_code']} not supported" ) program = programcls( qick_soc, Config(**data["cfg"]), load_elements(data["sequence"]), [Qubit(**qubit) for qubit in data["qubits"]], *args, ) asm_prog = program.asm() qick_logger.handlers[0].doRollover() # type: ignore qick_logger.info(asm_prog) num_instructions = len(program.prog_list) max_mem = qick_soc["tprocs"][0]["pmem_size"] if num_instructions > max_mem: raise MemoryError( f"The tproc has a max memory size of {max_mem}, " f"but the program had {num_instructions} instructions" ) if opcode is OperationCode.EXECUTE_PULSE_SEQUENCE_RAW: results = program.acquire_decimated( qick_soc, load_pulses=True, progress=False, debug=False, ) toti = [[results[0][0].tolist()]] totq = [[results[0][1].tolist()]] else: toti, totq = program.perform_experiment( qick_soc, average=data["cfg"]["average"], ) return {"i": toti, "q": totq}
[docs]class ConnectionHandler(BaseRequestHandler): """Handle requests to the server."""
[docs] def receive_command(self) -> dict: """Receive commands from qibolab client. The communication protocol is: * first the server receives a 4 bytes integer with the length of the message to actually receive * waits for the message and decode it * returns the unpcikled dictionary """ count = int.from_bytes(self.request.recv(4), "big") received = self.request.recv(count, socket.MSG_WAITALL) data = json.loads(received) return data
[docs] def handle(self): """Handle a connection to the server. * Receives command from client * Executes qick program * Return results """ # set the server in non-blocking mode self.server.socket.setblocking(False) try: data = self.receive_command() results = execute_program(data, self.server.qick_soc) except Exception: # pylint: disable=W0612,W0718 logger.exception("") logger.error("Faling command: %s", data) results = traceback.format_exc() self.server.qick_soc.reset_gens() self.request.sendall(bytes(json.dumps(results), "utf-8"))
[docs]def log_initial_info(): """Log info regarding the loaded configuration.""" logger.info("Server listening, PID %d", os.getpid()) mux_str = "Multiplexed" if cfg.IS_MULTIPLEXED else "Not multiplexed" logger.info("%s firmware loaded from %s", mux_str, cfg.QICKSOC_LOCATION)
[docs]def serve(host, port): """Open the TCPServer and wait forever for connections.""" # initialize QickSoc object (firmware and clocks) TCPServer.allow_reuse_address = True with TCPServer((host, port), ConnectionHandler) as server: server.qick_soc = QickSoc(bitfile=cfg.QICKSOC_LOCATION) log_initial_info() server.serve_forever()