import json
import requests
from qcodes_contrib_drivers.drivers.ERAInstruments import ERASynthPlusPlus
from qibo.config import log
from qibolab.instruments.oscillator import LocalOscillator
RECONNECTION_ATTEMPTS = 10
"""Number of times to attempt sending requests to the web server in case of
failure."""
TIMEOUT = 10
"""Timeout time for HTTP requests in seconds."""
[docs]class ERASynthEthernet:
"""ERA ethernet driver that follows the QCoDeS interface.
Controls the instrument via HTTP requests to the instrument's web
server.
"""
def __init__(self, name, address):
self.name = name
self.address = address
self.post("readAll", 1)
self.post("readDiagnostic", 0)
self.post("rfoutput", 0)
@property
def url(self):
return f"http://{self.address}/"
[docs] def post(self, name, value):
"""Post a value to the instrument's web server.
Try to post multiple times, waiting for 0.1 seconds between each attempt.
Args:
name: str = The name of the value to post.
value: str = The value to post.
"""
value = str(value)
for _ in range(RECONNECTION_ATTEMPTS):
try:
response = requests.post(self.url, data={name: value}, timeout=TIMEOUT)
if response.status_code == 200:
return True
break
except (ConnectionError, TimeoutError, requests.exceptions.ReadTimeout):
log.info("ERAsynth connection timed out, retrying...")
raise ConnectionError(f"Unable to post {name}={value} to {self.name}")
[docs] def get(self, name):
"""Get a value from the instrument's web server.
Try to get multiple times, waiting for 0.1 seconds between each attempt.
Args:
name: str = The name of the value to get.
"""
if name == "ref_osc_source":
value = self.get("reference_int_ext")
if value == 1:
return "EXT"
else:
return "INT"
for _ in range(RECONNECTION_ATTEMPTS):
try:
response = requests.post(
self.url, params={"readAll": 1}, timeout=TIMEOUT
)
if response.status_code == 200:
# reponse.text is a dictonary in string format, convert it to a dictonary
return json.loads(response.text)[name]
break
except (ConnectionError, TimeoutError, requests.exceptions.ReadTimeout):
log.info("ERAsynth connection timed out, retrying...")
raise ConnectionError(f"Unable to get {name} from {self.name}")
[docs] def set(self, name, value):
"""Set a value to the instrument's web server.
Args:
name (str): Name of the paramater that we are updating.
In qibolab this can be ``frequency``, ``power`` or ``ref_osc_source``,
however the instrument's web server may support more values.
value: New value to set to the given parameter.
The type of value depends on the parameter being updated.
"""
if name == "ref_osc_source":
if value.lower() in ("int", "internal"):
self.post("reference_int_ext", 0)
elif value.lower() in ("ext", "external"):
self.post("reference_int_ext", 1)
else:
raise ValueError(f"Invalid reference clock source {value}")
elif name == "frequency":
self.post(name, int(value))
elif name == "power":
self.post(name, float(value))
else:
self.post(name, value)
[docs] def on(self):
self.post("rfoutput", 1)
[docs] def off(self):
self.post("rfoutput", 0)
[docs] def close(self):
self.off()
[docs]class ERA(LocalOscillator):
"""Driver to control the ERAsynth++ local oscillator.
This driver is using:
https://qcodes.github.io/Qcodes_contrib_drivers/api/generated/qcodes_contrib_drivers.drivers.ERAInstruments.html#qcodes_contrib_drivers.drivers.ERAInstruments.erasynth.ERASynthPlusPlus
or the custom :class:`qibolab.instruments.erasynth.ERASynthEthernet` object
if we are connected via ethernet.
"""
def __init__(self, name, address, ethernet=True, ref_osc_source=None):
super().__init__(name, address, ref_osc_source)
self.ethernet = ethernet
[docs] def create(self):
if self.ethernet:
return ERASynthEthernet(self.name, self.address)
else:
return ERASynthPlusPlus(f"{self.name}", f"TCPIP::{self.address}::INSTR")