Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group common Qblox modules functions #730

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 13 additions & 113 deletions src/qibolab/instruments/qblox/cluster_qcm_bb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

import json

from qblox_instruments.qcodes_drivers.cluster import Cluster as QbloxCluster
from qblox_instruments.qcodes_drivers.qcm_qrm import QcmQrm as QbloxQrmQcm
from qibo.config import log

from qibolab.instruments.qblox.module import ClusterModule
from qibolab.instruments.qblox.q1asm import (
Block,
Expand Down Expand Up @@ -97,7 +93,6 @@ class QcmBb(ClusterModule):
"""

DEFAULT_SEQUENCERS = {"o1": 0, "o2": 1, "o3": 2, "o4": 3}
FREQUENCY_LIMIT = 500e6
OUT_PORT_PATH = {0: "I", 1: "Q", 2: "I", 3: "Q"}

def __init__(self, name: str, address: str):
Expand All @@ -114,19 +109,6 @@ def __init__(self, name: str, address: str):
>>> qcm_module = QcmBb(name="qcm_bb", address="192.168.1.100:2", cluster=cluster_instance)
"""
super().__init__(name, address)
self._ports: dict = {}
self.device: QbloxQrmQcm = None

self._debug_folder: str = ""
self._sequencers: dict[Sequencer] = {}
self.channel_map: dict = {}
self._device_num_output_ports = 2
self._device_num_sequencers: int
self._free_sequencers_numbers: list[int] = (
[]
) # TODO: we can create only list and put three flags: free, used, unused
self._used_sequencers_numbers: list[int] = []
self._unused_sequencers_numbers: list[int] = []

def _set_default_values(self):
# disable all sequencer connections
Expand All @@ -149,40 +131,12 @@ def _set_default_values(self):
for port_num, value in self.OUT_PORT_PATH.items():
self.device.sequencers[port_num].set(f"connect_out{port_num}", value)

def connect(self, cluster: QbloxCluster = None):
"""Connects to the instrument using the instrument settings in the
runcard.

Once connected, it creates port classes with properties mapped
to various instrument parameters, and initialises the the
underlying device parameters. It uploads to the module the port
settings loaded from the runcard.
"""
if self.is_connected:
return

elif cluster is not None:
self.device = cluster.modules[int(self.address.split(":")[1]) - 1]
# test connection with module
if not self.device.present():
raise ConnectionError(
f"Module {self.device.name} not connected to cluster {cluster.name}"
)
# once connected, initialise the parameters of the device to the default values
self._device_num_sequencers = len(self.device.sequencers)
self._set_default_values()
# then set the value loaded from the runcard
try:
for port in self._ports:
self._sequencers[port] = []
self._ports[port].hardware_mod_en = True
self._ports[port].nco_freq = 0
self._ports[port].nco_phase_offs = 0
except Exception as error:
raise RuntimeError(
f"Unable to initialize port parameters on module {self.name}: {error}"
)
self.is_connected = True
def _setup_ports(self):
for port in self._ports:
self._sequencers[port] = []
self._ports[port].hardware_mod_en = True
self._ports[port].nco_freq = 0
self._ports[port].nco_phase_offs = 0

def setup(self, **settings):
"""Cache the settings of the runcard and instantiate the ports of the
Expand Down Expand Up @@ -213,23 +167,14 @@ def _get_next_sequencer(self, port, frequency, qubits: dict):
# select the qubit with flux line, if present, connected to the specific port
qubit = None
for _qubit in qubits.values():
if _qubit.flux is not None and _qubit.flux.port == self.ports(port):
if _qubit.flux is not None and _qubit.flux.port == self._ports[port]:
qubit = _qubit

# select a new sequencer and configure it as required
next_sequencer_number = self._free_sequencers_numbers.pop(0)
if next_sequencer_number != self.DEFAULT_SEQUENCERS[port]:
for parameter in self.device.sequencers[
self.DEFAULT_SEQUENCERS[port]
].parameters:
# exclude read-only parameter `sequence`
if parameter not in ["sequence"]:
value = self.device.sequencers[self.DEFAULT_SEQUENCERS[port]].get(
param_name=parameter
)
if value:
target = self.device.sequencers[next_sequencer_number]
target.set(parameter, value)
default_sequencer_number = self.DEFAULT_SEQUENCERS[port]
if next_sequencer_number != default_sequencer_number:
self.clone_sequencer_params(default_sequencer_number, next_sequencer_number)

# if hardware modulation is enabled configure nco_frequency
if self._ports[port].hardware_mod_en:
Expand All @@ -245,24 +190,10 @@ def _get_next_sequencer(self, port, frequency, qubits: dict):
sequencer.qubit = qubit.name if qubit else None
return sequencer

def get_if(self, pulse):
"""Returns the intermediate frequency needed to synthesise a pulse
based on the port lo frequency."""

_rf = pulse.frequency
_lo = 0 # QCMs do not have local oscillator
_if = _rf - _lo
if abs(_if) > self.FREQUENCY_LIMIT:
raise RuntimeError(
f"""
Pulse frequency {_rf:_} cannot be synthesised, it exceeds the maximum frequency of {self.FREQUENCY_LIMIT:_}"""
)
return _if

def process_pulse_sequence(
self,
qubits: dict,
instrument_pulses: PulseSequence,
sequence: PulseSequence,
navgs: int,
nshots: int,
repetition_duration: int,
Expand Down Expand Up @@ -308,16 +239,9 @@ def process_pulse_sequence(
self._free_sequencers_numbers = list(range(len(self._ports), 6))

# process the pulses for every port
for port in self._ports:
for port, port_obj in self._ports.items():
# split the collection of instruments pulses by ports
port_channel = [
chan.name
for chan in self.channel_map.values()
if chan.port.name == port
]
port_pulses: PulseSequence = instrument_pulses.get_channel_pulses(
*port_channel
)
port_pulses = self.filter_port_pulse(sequence, qubits, port_obj)

# initialise the list of sequencers required by the port
self._sequencers[port] = []
Expand Down Expand Up @@ -732,27 +656,3 @@ def upload(self):
filename = self._debug_folder + f"Z_{self.name}_snapshot.json"
with open(filename, "w", encoding="utf-8") as file:
print_readable_snapshot(self.device, file, update=True)

def play_sequence(self):
"""Executes the sequence of instructions."""

for sequencer_number in self._used_sequencers_numbers:
# Start used sequencers
self.device.start_sequencer(sequencer_number)

def disconnect(self):
"""Stops all sequencers, disconnect all the outputs from the AWG paths
of the sequencers."""
if not self.is_connected:
return
for sequencer_number in self._used_sequencers_numbers:
state = self.device.get_sequencer_state(sequencer_number)
if state.status != "STOPPED":
log.warning(
f"Device {self.device.sequencers[sequencer_number].name} did not stop normally\nstate: {state}"
)

self.device.stop_sequencer()
self.device.disconnect_outputs()
self.is_connected = False
self.device = None
Loading
Loading