From 7907031b5c907b7c1cc7a017f3f676396c0cbbc6 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Mon, 27 Mar 2023 17:19:35 +0200 Subject: [PATCH 01/14] move approacher --- .../linien_server/{ => optimization}/approach_line.py | 0 linien-server/linien_server/optimization/optimization.py | 7 ++++--- tests/test_approacher.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) rename linien-server/linien_server/{ => optimization}/approach_line.py (100%) diff --git a/linien-server/linien_server/approach_line.py b/linien-server/linien_server/optimization/approach_line.py similarity index 100% rename from linien-server/linien_server/approach_line.py rename to linien-server/linien_server/optimization/approach_line.py diff --git a/linien-server/linien_server/optimization/optimization.py b/linien-server/linien_server/optimization/optimization.py index a2912fac..2938201d 100644 --- a/linien-server/linien_server/optimization/optimization.py +++ b/linien-server/linien_server/optimization/optimization.py @@ -21,9 +21,10 @@ import numpy as np from linien_common.common import determine_shift_by_correlation, get_lock_point -from linien_server.approach_line import Approacher -from linien_server.optimization.engine import OptimizerEngine -from linien_server.optimization.utils import FINAL_ZOOM_FACTOR + +from .approach_line import Approacher +from .engine import OptimizerEngine +from .utils import FINAL_ZOOM_FACTOR class OptimizeSpectroscopy: diff --git a/tests/test_approacher.py b/tests/test_approacher.py index ede173c8..2e506f49 100644 --- a/tests/test_approacher.py +++ b/tests/test_approacher.py @@ -18,7 +18,7 @@ import numpy as np from linien_common.common import get_lock_point -from linien_server.approach_line import Approacher +from linien_server.optimization.approach_line import Approacher from linien_server.parameters import Parameters Y_SHIFT = 4000 From ada0ad3b0fafcb37b42ceb202dda0f4aea94a128 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Mon, 27 Mar 2023 17:20:02 +0200 Subject: [PATCH 02/14] move all parameter related things to one file --- .../linien_server/parameter_store.py | 84 -------- linien-server/linien_server/parameters.py | 201 +++++++++++++++++- .../linien_server/parameters_base.py | 161 -------------- linien-server/linien_server/server.py | 3 +- 4 files changed, 194 insertions(+), 255 deletions(-) delete mode 100644 linien-server/linien_server/parameter_store.py delete mode 100644 linien-server/linien_server/parameters_base.py diff --git a/linien-server/linien_server/parameter_store.py b/linien-server/linien_server/parameter_store.py deleted file mode 100644 index 21012164..00000000 --- a/linien-server/linien_server/parameter_store.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright 2018-2022 Benjamin Wiegand -# Copyright 2021-2022 Bastian Leykauf -# -# This file is part of Linien and based on redpid. -# -# Linien is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Linien is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linien. If not, see . - -import atexit -import pickle -from time import time - -import linien_server - -PARAMETER_STORE_FN = "/linien_parameters.pickle" - - -class ParameterStore: - """This class installs an `atexit` listener that persists parameters to disk - when the server shuts down. Once it restarts the parameters are restored.""" - - def __init__(self, parameters): - self.parameters = parameters - self.restore_parameters() - self.setup_listener() - - def setup_listener(self): - """Listen for shutdown""" - atexit.register(self.save_parameters) - - def restore_parameters(self): - """When the server starts, this method restores previously saved - parameters (if any).""" - try: - with open(PARAMETER_STORE_FN, "rb") as f: - data = pickle.load(f) - except (FileNotFoundError, pickle.UnpicklingError, EOFError): - return - - print("restore parameters") - - for param_name, value in data["parameters"].items(): - try: - getattr(self.parameters, param_name).value = value - except AttributeError: - # ignore parameters that don't exist (anymore) - continue - - def save_parameters(self): - """Gather all parameters and store them on disk.""" - print("save parameters") - parameters = {} - - for param_name in self.parameters._restorable_parameters: - param = getattr(self.parameters, param_name) - parameters[param_name] = param.value - - try: - with open(PARAMETER_STORE_FN, "wb") as f: - pickle.dump( - { - "parameters": parameters, - "time": time(), - "version": linien_server.__version__, - }, - f, - ) - except PermissionError: - # this may happen if the server doesn't run on RedPitaya but on the - # developer's machine. As it is not a critical problem, just print - # the exception and ignore it - from traceback import print_exc - - print_exc() diff --git a/linien-server/linien_server/parameters.py b/linien-server/linien_server/parameters.py index 89449f91..1e940d09 100644 --- a/linien-server/linien_server/parameters.py +++ b/linien-server/linien_server/parameters.py @@ -17,18 +17,81 @@ # You should have received a copy of the GNU General Public License # along with Linien. If not, see . +import atexit +import pickle +from time import time + +import linien_server from linien_common.common import ( AUTO_DETECT_AUTOLOCK_MODE, FAST_AUTOLOCK, PSD_ALGORITHM_LPSD, MHz, Vpp, + pack, ) from linien_common.config import DEFAULT_COLORS, N_COLORS -from linien_server.parameters_base import BaseParameters, Parameter -class Parameters(BaseParameters): +class Parameter: + """Represents a single parameter and is used by `Parameters`.""" + + def __init__( + self, + min_=None, + max_=None, + start=None, + wrap=False, + sync=True, + collapsed_sync=True, + ): + self.min = min_ + self.max = max_ + self.wrap = wrap + self._value = start + self._start = start + self._listeners = set() + self._collapsed_sync = collapsed_sync + self.exposed_can_be_cached = sync + + @property + def value(self): + return self._value + + @value.setter + def value(self, value): + # check bounds + if self.min is not None and value < self.min: + value = self.min if not self.wrap else self.max + if self.max is not None and value > self.max: + value = self.max if not self.wrap else self.min + + self._value = value + + # we copy it because a listener could remove a listener --> this would + # cause an error in this loop + for listener in self._listeners.copy(): + listener(value) + + def on_change(self, function, call_listener_with_first_value=True): + self._listeners.add(function) + + if call_listener_with_first_value: + if self._value is not None: + function(self._value) + + def remove_listener(self, function): + if function in self._listeners: + self._listeners.remove(function) + + def exposed_reset(self): + self.value = self._start + + def register_remote_listener(self, remote_uuid): + pass + + +class Parameters: """ This class defines the parameters of the Linien server. They represent the public interface and can be used to control the behavior of the server. @@ -52,7 +115,8 @@ class Parameters(BaseParameters): """ def __init__(self): - super().__init__() + self._remote_listener_queue = {} + self._remote_listener_callbacks = {} # parameters whose values are saved by the client and restored if the client # connects to the RedPitaya with no server running: @@ -165,7 +229,7 @@ def __init__(self): self.control_channel = Parameter(start=1, min_=0, max_=1) """ - Configures the output of the lock signal. A value of 0 means FAST OUT 1 and a + Configures the output of the lock signal. A value of 0 means FAST OUT 1 and a value of 1 corresponds to FAST OUT 2 """ @@ -174,7 +238,7 @@ def __init__(self): Configures the output of the slow PID control: 0 --> FAST OUT 1 1 --> FAST OUT 2 - 2 --> ANALOG OUT 0 (slow channel) + 2 --> ANALOG OUT 0 (slow channel) """ self.gpio_p_out = Parameter(start=0, min_=0, max_=0b11111111) @@ -328,7 +392,7 @@ def __init__(self): corresponds to equal ratio -128 only channel A being active 128 only channel B being active - Integer values [-128, ..., 128] are allowed. + Integer values [-128, ..., 128] are allowed. """ # The following parameters exist twice, i.e. once per channel @@ -370,7 +434,7 @@ def __init__(self): automatically determine suitable filter for a given modulation frequency or whether the user may configure the filters himself. If automatic mode is enabled, two low pass filters are installed with a frequency of half the - modulation frequency. + modulation frequency. """ for filter_i in (1, 2): @@ -406,7 +470,7 @@ def __init__(self): """ After combining channels A and B and before passing the result to the PID, `combined_offset` is added. It uses the same units as the channel offsets, i.e. - a value of -8191 shifts the data down by 1V, a value of +8191 moves it up. + a value of -8191 shifts the data down by 1V, a value of +8191 moves it up. """ self.p = Parameter(start=50, max_=8191) @@ -515,3 +579,124 @@ def __init__(self): "plot_color_%d" % color_idx, Parameter(start=DEFAULT_COLORS[color_idx]), ) + + def __iter__(self): + for name, param in self.get_all_parameters(): + yield name, param.value + + def get_all_parameters(self): + for name, element in self.__dict__.items(): + if isinstance(element, Parameter): + yield name, element + + def init_parameter_sync(self, uuid): + """To be called by a remote client: Yields all parameters as well + as their values and if the parameters are suited to be cached registers + a listener that pushes changes of these parameters to the client.""" + for name, element in self.get_all_parameters(): + yield name, element, element.value, element.exposed_can_be_cached + if element.exposed_can_be_cached: + self.register_remote_listener(uuid, name) + + def register_remote_listener(self, uuid, param_name): + self._remote_listener_queue.setdefault(uuid, []) + self._remote_listener_callbacks.setdefault(uuid, []) + + def on_change(value, uuid=uuid, param_name=param_name): + if uuid in self._remote_listener_queue: + self._remote_listener_queue[uuid].append((param_name, value)) + + param = getattr(self, param_name) + param.on_change(on_change) + + self._remote_listener_callbacks[uuid].append((param, on_change)) + + def unregister_remote_listeners(self, uuid): + for param, callback in self._remote_listener_callbacks[uuid]: + param.remove_listener(callback) + + del self._remote_listener_queue[uuid] + del self._remote_listener_callbacks[uuid] + + def get_listener_queue(self, uuid): + queue = self._remote_listener_queue.get(uuid, []) + self._remote_listener_queue[uuid] = [] + + # filter out multiple values for collapsible parameters + already_has_value = [] + for idx in reversed(range(len(queue))): + param_name, value = queue[idx] + if self._get_param(param_name)._collapsed_sync: + if param_name in already_has_value: + del queue[idx] + else: + already_has_value.append(param_name) + + return pack(queue) + + def _get_param(self, param_name): + param = getattr(self, param_name) + assert isinstance(param, Parameter) + return param + + +PARAMETER_STORE_FN = "/linien_parameters.pickle" + + +class ParameterStore: + """This class installs an `atexit` listener that persists parameters to disk + when the server shuts down. Once it restarts the parameters are restored.""" + + def __init__(self, parameters): + self.parameters = parameters + self.restore_parameters() + self.setup_listener() + + def setup_listener(self): + """Listen for shutdown""" + atexit.register(self.save_parameters) + + def restore_parameters(self): + """When the server starts, this method restores previously saved + parameters (if any).""" + try: + with open(PARAMETER_STORE_FN, "rb") as f: + data = pickle.load(f) + except (FileNotFoundError, pickle.UnpicklingError, EOFError): + return + + print("restore parameters") + + for param_name, value in data["parameters"].items(): + try: + getattr(self.parameters, param_name).value = value + except AttributeError: + # ignore parameters that don't exist (anymore) + continue + + def save_parameters(self): + """Gather all parameters and store them on disk.""" + print("save parameters") + parameters = {} + + for param_name in self.parameters._restorable_parameters: + param = getattr(self.parameters, param_name) + parameters[param_name] = param.value + + try: + with open(PARAMETER_STORE_FN, "wb") as f: + pickle.dump( + { + "parameters": parameters, + "time": time(), + "version": linien_server.__version__, + }, + f, + ) + except PermissionError: + # this may happen if the server doesn't run on RedPitaya but on the + # developer's machine. As it is not a critical problem, just print + # the exception and ignore it + from traceback import print_exc + + print_exc() diff --git a/linien-server/linien_server/parameters_base.py b/linien-server/linien_server/parameters_base.py deleted file mode 100644 index a7bef3e8..00000000 --- a/linien-server/linien_server/parameters_base.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright 2018-2022 Benjamin Wiegand -# -# This file is part of Linien and based on redpid. -# -# Linien is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Linien is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linien. If not, see . - -from linien_common.common import pack - - -class Parameter: - """Represents a single parameter and is used by `Parameters`.""" - - def __init__( - self, - min_=None, - max_=None, - start=None, - wrap=False, - sync=True, - collapsed_sync=True, - ): - self.min = min_ - self.max = max_ - self.wrap = wrap - self._value = start - self._start = start - self._listeners = set() - self._collapsed_sync = collapsed_sync - self.exposed_can_be_cached = sync - - @property - def value(self): - return self._value - - @value.setter - def value(self, value): - # check bounds - if self.min is not None and value < self.min: - value = self.min if not self.wrap else self.max - if self.max is not None and value > self.max: - value = self.max if not self.wrap else self.min - - self._value = value - - # we copy it because a listener could remove a listener --> this would - # cause an error in this loop - for listener in self._listeners.copy(): - listener(value) - - def on_change(self, function, call_listener_with_first_value=True): - self._listeners.add(function) - - if call_listener_with_first_value: - if self._value is not None: - function(self._value) - - def remove_listener(self, function): - if function in self._listeners: - self._listeners.remove(function) - - def exposed_reset(self): - self.value = self._start - - def register_remote_listener(self, remote_uuid): - pass - - -class BaseParameters: - """Represents a set of parameters. In an actual program, it should be - sub-classed like this: - - class MyParameters(BaseParameters): - def __init__(self): - self.param1 = Parameter(min_=12, max_=24) - - Parameters can be changed like this: - - p = MyParameters(...) - p.param1.value = 123 - - You can register callback functions like this: - - def on_change(value): - # do something - - p.param1.on_change(on_change) - """ - - def __init__(self): - self._remote_listener_queue = {} - self._remote_listener_callbacks = {} - - def get_all_parameters(self): - for name, element in self.__dict__.items(): - if isinstance(element, Parameter): - yield name, element - - def init_parameter_sync(self, uuid): - """To be called by a remote client: Yields all parameters as well - as their values and if the parameters are suited to be cached registers - a listener that pushes changes of these parameters to the client.""" - for name, element in self.get_all_parameters(): - yield name, element, element.value, element.exposed_can_be_cached - if element.exposed_can_be_cached: - self.register_remote_listener(uuid, name) - - def register_remote_listener(self, uuid, param_name): - self._remote_listener_queue.setdefault(uuid, []) - self._remote_listener_callbacks.setdefault(uuid, []) - - def on_change(value, uuid=uuid, param_name=param_name): - if uuid in self._remote_listener_queue: - self._remote_listener_queue[uuid].append((param_name, value)) - - param = getattr(self, param_name) - param.on_change(on_change) - - self._remote_listener_callbacks[uuid].append((param, on_change)) - - def unregister_remote_listeners(self, uuid): - for param, callback in self._remote_listener_callbacks[uuid]: - param.remove_listener(callback) - - del self._remote_listener_queue[uuid] - del self._remote_listener_callbacks[uuid] - - def get_listener_queue(self, uuid): - queue = self._remote_listener_queue.get(uuid, []) - self._remote_listener_queue[uuid] = [] - - # filter out multiple values for collapsible parameters - already_has_value = [] - for idx in reversed(range(len(queue))): - param_name, value = queue[idx] - if self._get_param(param_name)._collapsed_sync: - if param_name in already_has_value: - del queue[idx] - else: - already_has_value.append(param_name) - - return pack(queue) - - def __iter__(self): - for name, param in self.get_all_parameters(): - yield name, param.value - - def _get_param(self, param_name): - param = getattr(self, param_name) - assert isinstance(param, Parameter) - return param diff --git a/linien-server/linien_server/server.py b/linien-server/linien_server/server.py index 0868e8eb..b398d99f 100644 --- a/linien-server/linien_server/server.py +++ b/linien-server/linien_server/server.py @@ -38,8 +38,7 @@ from linien_server import __version__ from linien_server.autolock.autolock import Autolock from linien_server.optimization.optimization import OptimizeSpectroscopy -from linien_server.parameter_store import ParameterStore -from linien_server.parameters import Parameters +from linien_server.parameters import Parameters, ParameterStore from linien_server.pid_optimization.pid_optimization import ( PIDOptimization, PSDAcquisition, From 3bdc57728a622478253e7a7e9e8a60bb0d2dafe6 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Mon, 27 Mar 2023 17:26:04 +0200 Subject: [PATCH 03/14] remove server utils --- linien-server/linien_server/acquisition.py | 18 ++++++- .../linien_server/autolock/robust.py | 10 +++- linien-server/linien_server/registers.py | 9 +++- linien-server/linien_server/utils.py | 54 ------------------- 4 files changed, 34 insertions(+), 57 deletions(-) delete mode 100644 linien-server/linien_server/utils.py diff --git a/linien-server/linien_server/acquisition.py b/linien-server/linien_server/acquisition.py index b42c6edc..c6e6bca6 100644 --- a/linien-server/linien_server/acquisition.py +++ b/linien-server/linien_server/acquisition.py @@ -17,14 +17,16 @@ # along with Linien. If not, see . import atexit +import shutil +import subprocess import threading from enum import Enum from multiprocessing import Pipe, Process +from pathlib import Path from time import sleep import rpyc from linien_common.config import ACQUISITION_PORT -from linien_server.utils import flash_fpga, start_nginx, stop_nginx class AcquisitionConnectionError(Exception): @@ -173,3 +175,17 @@ def set_raw_acquisition(self, enabled, decimation=None): def set_dual_channel(self, enabled): self.parent_conn.send((AcquisitionProcessSignals.SET_DUAL_CHANNEL, enabled)) + + +def stop_nginx(): + subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait() + subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait() + + +def start_nginx(): + subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"]) + + +def flash_fpga(): + filepath = Path(__file__).parent / "linien.bin" + shutil.copy(str(filepath.resolve()), "/dev/xdevcfg") diff --git a/linien-server/linien_server/autolock/robust.py b/linien-server/linien_server/autolock/robust.py index 73551d48..0140ebf7 100644 --- a/linien-server/linien_server/autolock/robust.py +++ b/linien-server/linien_server/autolock/robust.py @@ -34,7 +34,6 @@ sign, sum_up_spectrum, ) -from linien_server.utils import sweep_speed_to_time class LockPositionNotFound(Exception): @@ -275,3 +274,12 @@ def get_lock_position_from_autolock_instructions( return idx + final_wait_time raise LockPositionNotFound() + + +def sweep_speed_to_time(sweep_speed): + """Sweep speed is an arbitrary unit (cf. `parameters.py`). + This function converts it to the duration of the sweep in seconds. + """ + f_real = 3.8e3 / (2**sweep_speed) + duration = 1 / f_real + return duration diff --git a/linien-server/linien_server/registers.py b/linien-server/linien_server/registers.py index 0e3ea7fb..ad9d8d12 100644 --- a/linien-server/linien_server/registers.py +++ b/linien-server/linien_server/registers.py @@ -29,7 +29,6 @@ from .acquisition import AcquisitionMaster from .csr import PitayaCSR from .iir_coeffs import make_filter -from .utils import twos_complement class Registers: @@ -376,3 +375,11 @@ def set_iir(self, iir_name, *args): # do it too often self.acquisition.set_iir_csr(iir_name, *args) self._iir_cache[iir_name] = args + + +def twos_complement(num, N_bits): + max_ = 1 << (N_bits - 1) + full = 2 * max_ + if num < 0: + num += full + return num diff --git a/linien-server/linien_server/utils.py b/linien-server/linien_server/utils.py deleted file mode 100644 index 38c20c5c..00000000 --- a/linien-server/linien_server/utils.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2018-2022 Benjamin Wiegand -# Copyright 2021-2022 Bastian Leykauf -# -# This file is part of Linien and based on redpid. -# -# Linien is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Linien is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linien. If not, see . - -import shutil -import subprocess -from pathlib import Path - - -def stop_nginx(): - subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait() - subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait() - - -def start_nginx(): - subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"]) - - -def flash_fpga(): - filepath = Path(__file__).parent / "linien.bin" - shutil.copy(str(filepath.resolve()), "/dev/xdevcfg") - - -def twos_complement(num, N_bits): - max_ = 1 << (N_bits - 1) - full = 2 * max_ - - if num < 0: - num += full - - return num - - -def sweep_speed_to_time(sweep_speed): - """Sweep speed is an arbitrary unit (cf. `parameters.py`). - This function converts it to the duration of the sweep in seconds. - """ - f_real = 3.8e3 / (2**sweep_speed) - duration = 1 / f_real - return duration From f57ba02f0275031545dcdc919fb04b368cfc93d0 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 08:13:37 +0200 Subject: [PATCH 04/14] create acqusition submodule --- .../linien_server/acquisition/__init__.py | 0 .../controller.py} | 12 ++++----- .../service.py} | 6 ++--- linien-server/linien_server/registers.py | 26 +++++++++---------- linien-server/linien_server/server.py | 10 ++++--- 5 files changed, 27 insertions(+), 27 deletions(-) create mode 100644 linien-server/linien_server/acquisition/__init__.py rename linien-server/linien_server/{acquisition.py => acquisition/controller.py} (95%) rename linien-server/linien_server/{acquisition_process.py => acquisition/service.py} (98%) diff --git a/linien-server/linien_server/acquisition/__init__.py b/linien-server/linien_server/acquisition/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/linien-server/linien_server/acquisition.py b/linien-server/linien_server/acquisition/controller.py similarity index 95% rename from linien-server/linien_server/acquisition.py rename to linien-server/linien_server/acquisition/controller.py index c6e6bca6..0cc07cba 100644 --- a/linien-server/linien_server/acquisition.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -27,6 +27,7 @@ import rpyc from linien_common.config import ACQUISITION_PORT +from linien_server.acquisition_process import AcquisitionService class AcquisitionConnectionError(Exception): @@ -46,7 +47,7 @@ class AcquisitionProcessSignals(Enum): SET_DUAL_CHANNEL = 9 -class AcquisitionMaster: +class AcquisitionController: def __init__(self, use_ssh, host): self.on_new_data_received = None @@ -58,7 +59,7 @@ def receive_acquired_data(conn): self.parent_conn, child_conn = Pipe() process = Process( - target=self.connect_acquisition_process, args=(child_conn, use_ssh, host) + target=self.connect_acquisition_service, args=(child_conn, use_ssh, host) ) process.daemon = True process.start() @@ -77,7 +78,7 @@ def receive_acquired_data(conn): def run_data_acquisition(self, on_new_data_received): self.on_new_data_received = on_new_data_received - def connect_acquisition_process(self, pipe, use_ssh, host): + def connect_acquisition_service(self, pipe, use_ssh, host): if use_ssh: # for debugging, acquisition process may be launched manually on the server # and rpyc can be used to connect to it @@ -85,11 +86,9 @@ def connect_acquisition_process(self, pipe, use_ssh, host): acquisition = acquisition_rpyc.root else: # This is what happens in production mode - from linien_server.acquisition_process import DataAcquisitionService - stop_nginx() flash_fpga() - acquisition = DataAcquisitionService() + acquisition = AcquisitionService() # tell the main thread that we're ready pipe.send(True) @@ -140,7 +139,6 @@ def connect_acquisition_process(self, pipe, use_ssh, host): def shutdown(self): if self.parent_conn: self.parent_conn.send((AcquisitionProcessSignals.SHUTDOWN,)) - start_nginx() def set_sweep_speed(self, speed): diff --git a/linien-server/linien_server/acquisition_process.py b/linien-server/linien_server/acquisition/service.py similarity index 98% rename from linien-server/linien_server/acquisition_process.py rename to linien-server/linien_server/acquisition/service.py index 7e08b277..3359e570 100644 --- a/linien-server/linien_server/acquisition_process.py +++ b/linien-server/linien_server/acquisition/service.py @@ -38,7 +38,7 @@ def shutdown(): os._exit(0) -class DataAcquisitionService(Service): +class AcquisitionService(Service): def __init__(self): self.red_pitaya = RedPitaya() self.csr = PythonCSR(self.red_pitaya) @@ -50,7 +50,7 @@ def __init__(self): self.data_hash = None self.data_uuid = None - super(DataAcquisitionService, self).__init__() + super(AcquisitionService, self).__init__() self.locked = False self.exposed_set_sweep_speed(9) @@ -278,5 +278,5 @@ def read_data_raw(self, offset, addr, data_length): if __name__ == "__main__": - t = OneShotServer(DataAcquisitionService(), port=ACQUISITION_PORT) + t = OneShotServer(AcquisitionService(), port=ACQUISITION_PORT) t.start() diff --git a/linien-server/linien_server/registers.py b/linien-server/linien_server/registers.py index ad9d8d12..ab61313a 100644 --- a/linien-server/linien_server/registers.py +++ b/linien-server/linien_server/registers.py @@ -26,7 +26,7 @@ ) from linien_common.config import DEFAULT_SWEEP_SPEED -from .acquisition import AcquisitionMaster +from .acquisition.controller import AcquisitionController from .csr import PitayaCSR from .iir_coeffs import make_filter @@ -43,7 +43,7 @@ def __init__(self, host=None, user=None, password=None): self.host = host self.user = user self.password = password - self.acquisition = None + self.acquisition_controller = None self._last_sweep_speed = None self._last_raw_acquisition_settings = None @@ -57,27 +57,27 @@ def connect(self, control, parameters): self.csr = PitayaCSR() def lock_status_changed(v): - if self.acquisition is not None: - self.acquisition.lock_status_changed(v) + if self.acquisition_controller is not None: + self.acquisition_controller.lock_status_changed(v) self.parameters.lock.on_change(lock_status_changed) def fetch_additional_signals_changed(v): - if self.acquisition is not None: - self.acquisition.fetch_additional_signals_changed(v) + if self.acquisition_controller is not None: + self.acquisition_controller.fetch_additional_signals_changed(v) self.parameters.fetch_additional_signals.on_change( fetch_additional_signals_changed ) def dual_channel_changed(dual_channel): - if self.acquisition is not None: - self.acquisition.set_dual_channel(dual_channel) + if self.acquisition_controller is not None: + self.acquisition_controller.set_dual_channel(dual_channel) self.parameters.dual_channel.on_change(dual_channel_changed) use_ssh = self.host is not None and self.host not in ("localhost", "127.0.0.1") - self.acquisition = AcquisitionMaster(use_ssh, self.host) + self.acquisition_controller = AcquisitionController(use_ssh, self.host) def write_registers(self): """Writes data from `parameters` to the FPGA.""" @@ -216,7 +216,7 @@ def write_registers(self): sweep_changed = params["sweep_speed"] != self._last_sweep_speed if sweep_changed: self._last_sweep_speed = params["sweep_speed"] - self.acquisition.set_sweep_speed(params["sweep_speed"]) + self.acquisition_controller.set_sweep_speed(params["sweep_speed"]) raw_acquisition_settings = ( params["acquisition_raw_enabled"], @@ -224,7 +224,7 @@ def write_registers(self): ) if raw_acquisition_settings != self._last_raw_acquisition_settings: self._last_raw_acquisition_settings = raw_acquisition_settings - self.acquisition.set_raw_acquisition(*raw_acquisition_settings) + self.acquisition_controller.set_raw_acquisition(*raw_acquisition_settings) fpga_base_freq = 125e6 @@ -367,13 +367,13 @@ def set_slow_pid(self, strength, slope, reset=None): self.set("slow_chain_pid_reset", reset) def set(self, key, value): - self.acquisition.set_csr(key, value) + self.acquisition_controller.set_csr(key, value) def set_iir(self, iir_name, *args): if self._iir_cache.get(iir_name) != args: # as setting iir parameters takes some time, take care that we don't # do it too often - self.acquisition.set_iir_csr(iir_name, *args) + self.acquisition_controller.set_iir_csr(iir_name, *args) self._iir_cache[iir_name] = args diff --git a/linien-server/linien_server/server.py b/linien-server/linien_server/server.py index b398d99f..7da4ddc4 100644 --- a/linien-server/linien_server/server.py +++ b/linien-server/linien_server/server.py @@ -140,7 +140,9 @@ def on_new_data_received(is_raw, plot_data, data_uuid): self.parameters.acquisition_raw_data.value = plot_data # each time new data is acquired, this function is called - self.registers.acquisition.on_new_data_received = on_new_data_received + self.registers.acquisition_controller.on_new_data_received = ( + on_new_data_received + ) self.pause_acquisition() self.continue_acquisition() @@ -240,7 +242,7 @@ def exposed_start_lock(self): def exposed_shutdown(self): """Kills the server.""" - self.registers.acquisition.shutdown() + self.registers.acquisition_controller.shutdown() _thread.interrupt_main() # we use SystemExit instead of os._exit because we want to call atexit # handlers @@ -272,14 +274,14 @@ def pause_acquisition(self): `continue_acquisition`.""" self.parameters.pause_acquisition.value = True self.data_uuid = random() - self.registers.acquisition.pause_acquisition() + self.registers.acquisition_controller.pause_acquisition() def continue_acquisition(self): """Continue acquisition after a short delay, when we are sure that the new parameters values have been written to the FPGA and that data that is now recorded is recorded with the correct parameters.""" self.parameters.pause_acquisition.value = False - self.registers.acquisition.continue_acquisition(self.data_uuid) + self.registers.acquisition_controller.continue_acquisition(self.data_uuid) class FakeRedPitayaControl(BaseService): From c3b3ddb6c6a26c21a466ffbab52ca7dfeb1e5c7a Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 08:31:17 +0200 Subject: [PATCH 05/14] move imports to top of files --- .../linien_server/acquisition/controller.py | 2 +- .../linien_server/acquisition/service.py | 2 +- linien-server/linien_server/server.py | 75 +++++++++---------- 3 files changed, 37 insertions(+), 42 deletions(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 0cc07cba..0613e73b 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -27,7 +27,7 @@ import rpyc from linien_common.config import ACQUISITION_PORT -from linien_server.acquisition_process import AcquisitionService +from linien_server.acquisition.service import AcquisitionService class AcquisitionConnectionError(Exception): diff --git a/linien-server/linien_server/acquisition/service.py b/linien-server/linien_server/acquisition/service.py index 3359e570..8fa800db 100644 --- a/linien-server/linien_server/acquisition/service.py +++ b/linien-server/linien_server/acquisition/service.py @@ -30,7 +30,7 @@ from rpyc import Service from rpyc.utils.server import OneShotServer -from .csr import PythonCSR +from ..csr import PythonCSR def shutdown(): diff --git a/linien-server/linien_server/server.py b/linien-server/linien_server/server.py index 7da4ddc4..3daeba36 100644 --- a/linien-server/linien_server/server.py +++ b/linien-server/linien_server/server.py @@ -21,7 +21,7 @@ import pickle import sys import threading -from random import random +from random import randint, random from time import sleep import click @@ -43,6 +43,7 @@ PIDOptimization, PSDAcquisition, ) +from linien_server.registers import Registers from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.server import ThreadedServer @@ -65,6 +66,9 @@ def on_disconnect(self, client): uuid = self._uuid_mapping[client] self.parameters.unregister_remote_listeners(uuid) + def exposed_get_server_version(self): + return __version__ + def exposed_get_param(self, param_name): return pack(self.parameters._get_param(param_name).value) @@ -94,19 +98,19 @@ def __init__(self, **kwargs): super().__init__() - from linien_server.registers import Registers - self.registers = Registers(**kwargs) self.registers.connect(self, self.parameters) def run_acquiry_loop(self): - """Starts a background process that keeps polling control and error - signal. Every received value is pushed to `parameters.to_plot`.""" + """ + Startsa background process that keeps polling control and error signal. Every + received value is pushed to `parameters.to_plot`. + """ def on_new_data_received(is_raw, plot_data, data_uuid): - # When a parameter is changed, `pause_acquisition` is set. - # This means that the we should skip new data until we are sure that - # it was recorded with the new settings. + # When a parameter is changed, `pause_acquisition` is set. This means that + # the we should skip new data until we are sure that it was recorded with + # the new settings. if not self.parameters.pause_acquisition.value: if data_uuid != self.data_uuid: return @@ -241,18 +245,12 @@ def exposed_start_lock(self): self.continue_acquisition() def exposed_shutdown(self): - """Kills the server.""" + """Kill the server.""" self.registers.acquisition_controller.shutdown() _thread.interrupt_main() - # we use SystemExit instead of os._exit because we want to call atexit - # handlers + # we use SystemExit instead of os._exit because we want to call atexit handlers raise SystemExit - def exposed_get_server_version(self): - import linien_server - - return linien_server.__version__ - def exposed_get_restorable_parameters(self): return self.parameters._restorable_parameters @@ -263,28 +261,34 @@ def exposed_continue_acquisition(self): self.continue_acquisition() def exposed_set_csr_direct(self, k, v): - """Directly sets a CSR register. This method is intended for debugging. - Normally, the FPGA should be controlled via manipulation of parameters.""" + """ + Directly sets a CSR register. This method is intended for debugging. Normally, + the FPGA should be controlled via manipulation of parameters. + """ self.registers.set(k, v) def pause_acquisition(self): - """Pause continuous acquisition. Call this before changing a parameter - that alters the error / control signal. This way, no inconsistent signals - reach the application. After setting the new parameter values, call - `continue_acquisition`.""" + """ + Pause continuous acquisition. Call this before changing a parameter that alters + the error / control signal. This way, no inconsistent signals reach the + application. After setting the new parameter values, call + `continue_acquisition`. + """ self.parameters.pause_acquisition.value = True self.data_uuid = random() self.registers.acquisition_controller.pause_acquisition() def continue_acquisition(self): - """Continue acquisition after a short delay, when we are sure that the - new parameters values have been written to the FPGA and that data that - is now recorded is recorded with the correct parameters.""" + """ + Continue acquisition after a short delay, when we are sure that the new + parameters values have been written to the FPGA and that data that is now + recorded is recorded with the correct parameters. + """ self.parameters.pause_acquisition.value = False self.registers.acquisition_controller.continue_acquisition(self.data_uuid) -class FakeRedPitayaControl(BaseService): +class FakeRedPitayaControlService(BaseService): def __init__(self): super().__init__() self.exposed_is_locked = None @@ -293,10 +297,6 @@ def exposed_write_registers(self): pass def run_acquiry_loop(self): - import threading - from random import randint - from time import sleep - def run(): while True: max_ = randint(0, 8191) @@ -332,11 +332,6 @@ def exposed_start_optimization(self, x0, x1, spectrum): def exposed_get_restorable_parameters(self): return self.parameters._restorable_parameters - def exposed_get_server_version(self): - import linien_server - - return linien_server.__version__ - def pause_acquisition(self): pass @@ -363,7 +358,7 @@ def run_server(port, fake=False, remote_rp=False): if fake: print("starting fake server") - control = FakeRedPitayaControl() + control = FakeRedPitayaControlService() else: if remote_rp is not None: assert ( @@ -391,12 +386,12 @@ def username_and_password_authenticator(sock): # variable secret = os.environ.get("LINIEN_AUTH_HASH") - # client always sends auth hash, even if we run in non-auth mode - # --> always read 64 bytes, otherwise rpyc connection can't be established + # client always sends auth hash, even if we run in non-auth mode --> always read + # 64 bytes, otherwise rpyc connection can't be established received = sock.recv(64) - # as a protection against brute force, we don't accept requests after - # too many failed auth requests + # as a protection against brute force, we don't accept requests after too many + # failed auth requests if failed_auth_counter["c"] > 1000: print("received too many failed auth requests!") sys.exit(1) From faf9f87a8e52eaf253fa48285893bb759e48fd41 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 08:39:11 +0200 Subject: [PATCH 06/14] turn receive_acquired_data into method --- .../linien_server/acquisition/controller.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 0613e73b..6937c7e3 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -51,33 +51,33 @@ class AcquisitionController: def __init__(self, use_ssh, host): self.on_new_data_received = None - def receive_acquired_data(conn): - while True: - is_raw, received_data, data_uuid = conn.recv() - if self.on_new_data_received is not None: - self.on_new_data_received(is_raw, received_data, data_uuid) - self.parent_conn, child_conn = Pipe() - process = Process( + acqusition_service_process = Process( target=self.connect_acquisition_service, args=(child_conn, use_ssh, host) ) - process.daemon = True - process.start() + acqusition_service_process.daemon = True + acqusition_service_process.start() # wait until connection is established self.parent_conn.recv() - thread = threading.Thread( - target=receive_acquired_data, args=(self.parent_conn,) + receive_data_thread = threading.Thread( + target=self.receive_acquired_data, args=(self.parent_conn,) ) - thread.daemon = True - thread.start() + receive_data_thread.daemon = True + receive_data_thread.start() atexit.register(self.shutdown) def run_data_acquisition(self, on_new_data_received): self.on_new_data_received = on_new_data_received + def receive_acquired_data(self, conn): + while True: + is_raw, received_data, data_uuid = conn.recv() + if self.on_new_data_received is not None: + self.on_new_data_received(is_raw, received_data, data_uuid) + def connect_acquisition_service(self, pipe, use_ssh, host): if use_ssh: # for debugging, acquisition process may be launched manually on the server @@ -136,6 +136,12 @@ def connect_acquisition_service(self, pipe, use_ssh, host): sleep(0.05) + def pause_acquisition(self): + self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True)) + + def continue_acquisition(self, uuid): + self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid)) + def shutdown(self): if self.parent_conn: self.parent_conn.send((AcquisitionProcessSignals.SHUTDOWN,)) @@ -158,12 +164,6 @@ def set_csr(self, key, value): def set_iir_csr(self, *args): self.parent_conn.send((AcquisitionProcessSignals.SET_IIR_CSR, args)) - def pause_acquisition(self): - self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True)) - - def continue_acquisition(self, uuid): - self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid)) - def set_raw_acquisition(self, enabled, decimation=None): if decimation is None: decimation = 0 From 8b44918aaa08f28929a66f605d5d78ace9d2476d Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 08:40:07 +0200 Subject: [PATCH 07/14] remove unused method --- linien-server/linien_server/acquisition/controller.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 6937c7e3..5104537c 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -69,9 +69,6 @@ def __init__(self, use_ssh, host): atexit.register(self.shutdown) - def run_data_acquisition(self, on_new_data_received): - self.on_new_data_received = on_new_data_received - def receive_acquired_data(self, conn): while True: is_raw, received_data, data_uuid = conn.recv() From 977ccf28a8699000e9e3a59c8819704f7d56b92b Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 09:02:53 +0200 Subject: [PATCH 08/14] unify method names --- .../linien_server/acquisition/controller.py | 32 +++++++------- linien-server/linien_server/parameters.py | 16 +++---- linien-server/linien_server/registers.py | 42 +++++++++---------- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 5104537c..88948326 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -80,12 +80,12 @@ def connect_acquisition_service(self, pipe, use_ssh, host): # for debugging, acquisition process may be launched manually on the server # and rpyc can be used to connect to it acquisition_rpyc = rpyc.connect(host, ACQUISITION_PORT) - acquisition = acquisition_rpyc.root + acquisition_service = acquisition_rpyc.root else: # This is what happens in production mode stop_nginx() flash_fpga() - acquisition = AcquisitionService() + acquisition_service = AcquisitionService() # tell the main thread that we're ready pipe.send(True) @@ -101,23 +101,23 @@ def connect_acquisition_service(self, pipe, use_ssh, host): raise SystemExit() elif data[0] == AcquisitionProcessSignals.SET_SWEEP_SPEED: speed = data[1] - acquisition.exposed_set_sweep_speed(speed) + acquisition_service.exposed_set_sweep_speed(speed) elif data[0] == AcquisitionProcessSignals.SET_LOCK_STATUS: - acquisition.exposed_set_lock_status(data[1]) + acquisition_service.exposed_set_lock_status(data[1]) elif data[0] == AcquisitionProcessSignals.FETCH_QUADRATURES: - acquisition.exposed_set_fetch_additional_signals(data[1]) + acquisition_service.exposed_set_fetch_additional_signals(data[1]) elif data[0] == AcquisitionProcessSignals.SET_RAW_ACQUISITION: - acquisition.exposed_set_raw_acquisition(data[1]) + acquisition_service.exposed_set_raw_acquisition(data[1]) elif data[0] == AcquisitionProcessSignals.SET_DUAL_CHANNEL: - acquisition.exposed_set_dual_channel(data[1]) + acquisition_service.exposed_set_dual_channel(data[1]) elif data[0] == AcquisitionProcessSignals.SET_CSR: - acquisition.exposed_set_csr(*data[1]) + acquisition_service.exposed_set_csr(*data[1]) elif data[0] == AcquisitionProcessSignals.SET_IIR_CSR: - acquisition.exposed_set_iir_csr(*data[1]) + acquisition_service.exposed_set_iir_csr(*data[1]) elif data[0] == AcquisitionProcessSignals.PAUSE_ACQUISIITON: - acquisition.exposed_pause_acquisition() + acquisition_service.exposed_pause_acquisition() elif data[0] == AcquisitionProcessSignals.CONTINUE_ACQUISITION: - acquisition.exposed_continue_acquisition(data[1]) + acquisition_service.exposed_continue_acquisition(data[1]) # load acquired data and send it to the main thread ( @@ -126,7 +126,7 @@ def connect_acquisition_service(self, pipe, use_ssh, host): data_was_raw, new_data, data_uuid, - ) = acquisition.exposed_return_data(last_hash) + ) = acquisition_service.exposed_return_data(last_hash) if new_data_returned: last_hash = new_hash pipe.send((data_was_raw, new_data, data_uuid)) @@ -147,11 +147,11 @@ def shutdown(self): def set_sweep_speed(self, speed): self.parent_conn.send((AcquisitionProcessSignals.SET_SWEEP_SPEED, speed)) - def lock_status_changed(self, status): + def set_lock_status(self, status): if self.parent_conn: self.parent_conn.send((AcquisitionProcessSignals.SET_LOCK_STATUS, status)) - def fetch_additional_signals_changed(self, status): + def fetch_additional_signals(self, status): if self.parent_conn: self.parent_conn.send((AcquisitionProcessSignals.FETCH_QUADRATURES, status)) @@ -161,9 +161,7 @@ def set_csr(self, key, value): def set_iir_csr(self, *args): self.parent_conn.send((AcquisitionProcessSignals.SET_IIR_CSR, args)) - def set_raw_acquisition(self, enabled, decimation=None): - if decimation is None: - decimation = 0 + def set_raw_acquisition(self, enabled, decimation=0): self.parent_conn.send( (AcquisitionProcessSignals.SET_RAW_ACQUISITION, (enabled, decimation)) ) diff --git a/linien-server/linien_server/parameters.py b/linien-server/linien_server/parameters.py index 1e940d09..f8dbfc08 100644 --- a/linien-server/linien_server/parameters.py +++ b/linien-server/linien_server/parameters.py @@ -51,8 +51,8 @@ def __init__( self._value = start self._start = start self._listeners = set() - self._collapsed_sync = collapsed_sync self.exposed_can_be_cached = sync + self._collapsed_sync = collapsed_sync @property def value(self): @@ -80,6 +80,9 @@ def on_change(self, function, call_listener_with_first_value=True): if self._value is not None: function(self._value) + def register_remote_listener(self, remote_uuid): + pass + def remove_listener(self, function): if function in self._listeners: self._listeners.remove(function) @@ -87,9 +90,6 @@ def remove_listener(self, function): def exposed_reset(self): self.value = self._start - def register_remote_listener(self, remote_uuid): - pass - class Parameters: """ @@ -590,9 +590,11 @@ def get_all_parameters(self): yield name, element def init_parameter_sync(self, uuid): - """To be called by a remote client: Yields all parameters as well - as their values and if the parameters are suited to be cached registers - a listener that pushes changes of these parameters to the client.""" + """ + To be called by a remote client: Yields all parameters as well as their values + and if the parameters are suited to be cached registers a listener that pushes + changes of these parameters to the client. + """ for name, element in self.get_all_parameters(): yield name, element, element.value, element.exposed_can_be_cached if element.exposed_can_be_cached: diff --git a/linien-server/linien_server/registers.py b/linien-server/linien_server/registers.py index ab61313a..1961b60a 100644 --- a/linien-server/linien_server/registers.py +++ b/linien-server/linien_server/registers.py @@ -56,25 +56,25 @@ def connect(self, control, parameters): self.csr = PitayaCSR() - def lock_status_changed(v): + def on_lock_status_changed(v): if self.acquisition_controller is not None: - self.acquisition_controller.lock_status_changed(v) + self.acquisition_controller.set_lock_status(v) - self.parameters.lock.on_change(lock_status_changed) + self.parameters.lock.on_change(on_lock_status_changed) - def fetch_additional_signals_changed(v): + def on_fetch_additional_signals_changed(v): if self.acquisition_controller is not None: - self.acquisition_controller.fetch_additional_signals_changed(v) + self.acquisition_controller.fetch_additional_signals(v) self.parameters.fetch_additional_signals.on_change( - fetch_additional_signals_changed + on_fetch_additional_signals_changed ) - def dual_channel_changed(dual_channel): + def on_dual_channel_changed(dual_channel): if self.acquisition_controller is not None: self.acquisition_controller.set_dual_channel(dual_channel) - self.parameters.dual_channel.on_change(dual_channel_changed) + self.parameters.dual_channel.on_change(on_dual_channel_changed) use_ssh = self.host is not None and self.host not in ("localhost", "127.0.0.1") self.acquisition_controller = AcquisitionController(use_ssh, self.host) @@ -98,8 +98,8 @@ def write_registers(self): self.control.exposed_is_locked = lock new = dict( - # sweep run is 1 by default. The gateware automatically takes care - # of stopping the sweep run after `request_lock` is set by setting + # sweep run is 1 by default. The gateware automatically takes care of + # stopping the sweep run after `request_lock` is set by setting # `sweep.clear` logic_sweep_run=1, logic_sweep_pause=int(params["sweep_pause"]), @@ -186,8 +186,8 @@ def write_registers(self): } ) else: - # display both demodulated error signals (if dual channel mode) - # OR: display demodulated error signal 1 + monitor signal + # display both demodulated error signals (if dual channel mode) OR: display + # demodulated error signal 1 + monitor signal new.update( { "scopegen_adc_a_sel": self.csr.signal("fast_a_out_i"), @@ -239,9 +239,9 @@ def write_registers(self): self.set(k, int(v)) if not lock and sweep_changed: - # reset sweep for a short time if the scan range was changed - # this is needed because otherwise it may take too long before - # the new scan range is reached --> no scope trigger is sent + # reset sweep for a short time if the scan range was changed this is needed + # because otherwise it may take too long before the new scan range is + # reached --> no scope trigger is sent self.set("logic_sweep_run", 0) self.set("logic_sweep_run", 1) @@ -296,10 +296,10 @@ def channel_polarity(channel): ) # if the filter frequency is too low (< 10Hz), the IIR doesn't - # work properly anymore. In that case, don't filter. - # This is also helpful if the raw (not demodulated) signal - # should be displayed which can be achieved by setting - # modulation frequency to 0. + # work properly anymore. In that case, don't filter. This is + # also helpful if the raw (not demodulated) signal should be + # displayed which can be achieved by setting modulation + # frequency to 0. if filter_frequency < 10: filter_enabled = False else: @@ -371,8 +371,8 @@ def set(self, key, value): def set_iir(self, iir_name, *args): if self._iir_cache.get(iir_name) != args: - # as setting iir parameters takes some time, take care that we don't - # do it too often + # as setting iir parameters takes some time, take care that we don't do it + # too often self.acquisition_controller.set_iir_csr(iir_name, *args) self._iir_cache[iir_name] = args From 396afef58fd247828649d240fc2200dceb0136d0 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 11:13:16 +0200 Subject: [PATCH 09/14] remove unused exception --- linien-server/linien_server/acquisition/controller.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 88948326..c0ef3ae9 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -30,10 +30,6 @@ from linien_server.acquisition.service import AcquisitionService -class AcquisitionConnectionError(Exception): - pass - - class AcquisitionProcessSignals(Enum): SHUTDOWN = 0 SET_SWEEP_SPEED = 2 From 35380a1a4dc559480379dbd99ddbe12211544e8a Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Tue, 28 Mar 2023 14:45:50 +0200 Subject: [PATCH 10/14] use better function names --- .../linien_server/acquisition/service.py | 222 +++++++++--------- linien-server/linien_server/server.py | 167 ++++++------- 2 files changed, 189 insertions(+), 200 deletions(-) diff --git a/linien-server/linien_server/acquisition/service.py b/linien-server/linien_server/acquisition/service.py index 8fa800db..be5aab3d 100644 --- a/linien-server/linien_server/acquisition/service.py +++ b/linien-server/linien_server/acquisition/service.py @@ -40,6 +40,8 @@ def shutdown(): class AcquisitionService(Service): def __init__(self): + super(AcquisitionService, self).__init__() + self.red_pitaya = RedPitaya() self.csr = PythonCSR(self.red_pitaya) self.csr_queue = [] @@ -50,8 +52,6 @@ def __init__(self): self.data_hash = None self.data_uuid = None - super(AcquisitionService, self).__init__() - self.locked = False self.exposed_set_sweep_speed(9) # when self.locked is set to True, this doesn't mean that the lock is really on. @@ -73,126 +73,55 @@ def __init__(self): self.run() def run(self): - def run_acquiry_loop(): - while True: - while self.csr_queue: - key, value = self.csr_queue.pop(0) - self.csr.set(key, value) - - while self.csr_iir_queue: - args = self.csr_iir_queue.pop(0) - self.csr.set_iir(*args) - - if self.locked and not self.confirmed_that_in_lock: - self.confirmed_that_in_lock = self.csr.get( - "logic_autolock_lock_running" - ) - if not self.confirmed_that_in_lock: - sleep(0.05) - continue - - if self.acquisition_paused: - sleep(0.05) - continue - - # copied from https://github.com/RedPitaya/RedPitaya/blob/14cca62dd58f29826ee89f4b28901602f5cdb1d8/api/src/oscilloscope.c#L115 # noqa: E501 - # check whether scope was triggered - not_triggered = (self.red_pitaya.scope.read(0x1 << 2) & 0x4) > 0 - if not_triggered: - sleep(0.05) - continue - - data, is_raw = self.read_data() - - if self.acquisition_paused: - # it may seem strange that we check this here a second time. Reason: - # `read_data` takes some time and if in the mean time acquisition - # was paused, we do not want to send the data - continue - - if self.skip_next_data: - self.skip_next_data = False - else: - self.data = pickle.dumps(data) - self.data_was_raw = is_raw - self.data_hash = random() - - self.program_acquisition_and_rearm() - - self.thread = threading.Thread(target=run_acquiry_loop, args=()) + self.thread = threading.Thread(target=self.acquisition_loop, args=()) self.thread.daemon = True self.thread.start() - def program_acquisition_and_rearm(self, trigger_delay=16384): - """Programs the acquisition settings and rearms acquisition.""" - if not self.locked: - target_decimation = 2 ** (self.sweep_speed + int(np.log2(DECIMATION))) + def acquisition_loop(self): + while True: + while self.csr_queue: + key, value = self.csr_queue.pop(0) + self.csr.set(key, value) - self.red_pitaya.scope.data_decimation = target_decimation - self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1 - - elif self.raw_acquisition_enabled: - self.red_pitaya.scope.data_decimation = 2**self.raw_acquisition_decimation - self.red_pitaya.scope.trigger_delay = trigger_delay - - else: - self.red_pitaya.scope.data_decimation = 1 - self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1 + while self.csr_iir_queue: + args = self.csr_iir_queue.pop(0) + self.csr.set_iir(*args) - # trigger_source=6 means external trigger positive edge - self.red_pitaya.scope.rearm(trigger_source=6) - - def exposed_return_data(self, last_hash): - no_data_available = self.data_hash is None - data_not_changed = self.data_hash == last_hash - if data_not_changed or no_data_available or self.acquisition_paused: - return False, None, None, None, None - else: - return True, self.data_hash, self.data_was_raw, self.data, self.data_uuid - - def exposed_set_sweep_speed(self, speed): - self.sweep_speed = speed - # if a slow acqisition is currently running and we change the sweep speed we - # don't want to wait until it finishes - self.program_acquisition_and_rearm() - - def exposed_set_lock_status(self, locked): - self.locked = locked - self.confirmed_that_in_lock = False + if self.locked and not self.confirmed_that_in_lock: + self.confirmed_that_in_lock = self.csr.get( + "logic_autolock_lock_running" + ) + if not self.confirmed_that_in_lock: + sleep(0.05) + continue - def exposed_set_fetch_additional_signals(self, fetch): - self.fetch_additional_signals = fetch + if self.acquisition_paused: + sleep(0.05) + continue - def exposed_set_raw_acquisition(self, data): - self.raw_acquisition_enabled = data[0] - self.raw_acquisition_decimation = data[1] + # copied from https://github.com/RedPitaya/RedPitaya/blob/14cca62dd58f29826ee89f4b28901602f5cdb1d8/api/src/oscilloscope.c#L115 # noqa: E501 + # check whether scope was triggered + not_triggered = (self.red_pitaya.scope.read(0x1 << 2) & 0x4) > 0 + if not_triggered: + sleep(0.05) + continue - def exposed_set_dual_channel(self, dual_channel): - self.dual_channel = dual_channel + data, is_raw = self.read_data() - def exposed_set_csr(self, key, value): - self.csr_queue.append((key, value)) + if self.acquisition_paused: + # it may seem strange that we check this here a second time. Reason: + # `read_data` takes some time and if in the mean time acquisition + # was paused, we do not want to send the data + continue - def exposed_set_iir_csr(self, *args): - self.csr_iir_queue.append(args) + if self.skip_next_data: + self.skip_next_data = False + else: + self.data = pickle.dumps(data) + self.data_was_raw = is_raw + self.data_hash = random() - def exposed_pause_acquisition(self): - self.acquisition_paused = True - self.data_hash = None - self.data = None - - def exposed_continue_acquisition(self, uuid): - self.program_acquisition_and_rearm() - sleep(0.01) - # resetting data here is not strictly required but we want to be on the safe - # side - self.data_hash = None - self.data = None - self.acquisition_paused = False - self.data_uuid = uuid - # if we are sweeping, we have to skip one data set because an incomplete sweep - # may have been recorded. When locked, this does not matter - self.skip_next_data = not self.confirmed_that_in_lock + self.program_acquisition_and_rearm() def read_data(self): write_pointer = self.red_pitaya.scope.write_pointer_trigger @@ -276,6 +205,77 @@ def read_data_raw(self, offset, addr, data_length): return signals + def program_acquisition_and_rearm(self, trigger_delay=16384): + """Programs the acquisition settings and rearms acquisition.""" + if not self.locked: + target_decimation = 2 ** (self.sweep_speed + int(np.log2(DECIMATION))) + + self.red_pitaya.scope.data_decimation = target_decimation + self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1 + + elif self.raw_acquisition_enabled: + self.red_pitaya.scope.data_decimation = 2**self.raw_acquisition_decimation + self.red_pitaya.scope.trigger_delay = trigger_delay + + else: + self.red_pitaya.scope.data_decimation = 1 + self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1 + + # trigger_source=6 means external trigger positive edge + self.red_pitaya.scope.rearm(trigger_source=6) + + def exposed_return_data(self, last_hash): + no_data_available = self.data_hash is None + data_not_changed = self.data_hash == last_hash + if data_not_changed or no_data_available or self.acquisition_paused: + return False, None, None, None, None + else: + return True, self.data_hash, self.data_was_raw, self.data, self.data_uuid + + def exposed_set_sweep_speed(self, speed): + self.sweep_speed = speed + # if a slow acqisition is currently running and we change the sweep speed we + # don't want to wait until it finishes + self.program_acquisition_and_rearm() + + def exposed_set_lock_status(self, locked): + self.locked = locked + self.confirmed_that_in_lock = False + + def exposed_set_fetch_additional_signals(self, fetch): + self.fetch_additional_signals = fetch + + def exposed_set_raw_acquisition(self, data): + self.raw_acquisition_enabled = data[0] + self.raw_acquisition_decimation = data[1] + + def exposed_set_dual_channel(self, dual_channel): + self.dual_channel = dual_channel + + def exposed_set_csr(self, key, value): + self.csr_queue.append((key, value)) + + def exposed_set_iir_csr(self, *args): + self.csr_iir_queue.append(args) + + def exposed_pause_acquisition(self): + self.acquisition_paused = True + self.data_hash = None + self.data = None + + def exposed_continue_acquisition(self, uuid): + self.program_acquisition_and_rearm() + sleep(0.01) + # resetting data here is not strictly required but we want to be on the safe + # side + self.data_hash = None + self.data = None + self.acquisition_paused = False + self.data_uuid = uuid + # if we are sweeping, we have to skip one data set because an incomplete sweep + # may have been recorded. When locked, this does not matter + self.skip_next_data = not self.confirmed_that_in_lock + if __name__ == "__main__": t = OneShotServer(AcquisitionService(), port=ACQUISITION_PORT) diff --git a/linien-server/linien_server/server.py b/linien-server/linien_server/server.py index 3daeba36..faa31f6d 100644 --- a/linien-server/linien_server/server.py +++ b/linien-server/linien_server/server.py @@ -96,61 +96,61 @@ def __init__(self, **kwargs): self._cached_data = {} self.exposed_is_locked = None - super().__init__() + super(RedPitayaControlService, self).__init__() self.registers = Registers(**kwargs) self.registers.connect(self, self.parameters) + self._connect_acquisition_to_parameters() + self._start_periodic_timer() + self.exposed_write_registers() - def run_acquiry_loop(self): + def _connect_acquisition_to_parameters(self): """ - Startsa background process that keeps polling control and error signal. Every - received value is pushed to `parameters.to_plot`. + Connect the acquisition loopo to the parameters: Every received value is pushed + to `parameters.to_plot`. """ - - def on_new_data_received(is_raw, plot_data, data_uuid): - # When a parameter is changed, `pause_acquisition` is set. This means that - # the we should skip new data until we are sure that it was recorded with - # the new settings. - if not self.parameters.pause_acquisition.value: - if data_uuid != self.data_uuid: - return - - data_loaded = pickle.loads(plot_data) - - if not is_raw: - is_locked = self.parameters.lock.value - - if not check_plot_data(is_locked, data_loaded): - print( - "warning: incorrect data received for lock state, ignoring!" - ) - return - - self.parameters.to_plot.value = plot_data - self._generate_signal_stats(data_loaded) - - # update signal history (if in locked state) - ( - self.parameters.control_signal_history.value, - self.parameters.monitor_signal_history.value, - ) = update_signal_history( - self.parameters.control_signal_history.value, - self.parameters.monitor_signal_history.value, - data_loaded, - is_locked, - self.parameters.control_signal_history_length.value, - ) - else: - self.parameters.acquisition_raw_data.value = plot_data - # each time new data is acquired, this function is called self.registers.acquisition_controller.on_new_data_received = ( - on_new_data_received + self._on_new_data_received ) self.pause_acquisition() self.continue_acquisition() - def run_periodic_timer(self): + def _on_new_data_received(self, is_raw, plot_data, data_uuid): + # When a parameter is changed, `pause_acquisition` is set. This means that + # the we should skip new data until we are sure that it was recorded with + # the new settings. + if not self.parameters.pause_acquisition.value: + if data_uuid != self.data_uuid: + return + + data_loaded = pickle.loads(plot_data) + + if not is_raw: + is_locked = self.parameters.lock.value + + if not check_plot_data(is_locked, data_loaded): + print("warning: incorrect data received for lock state, ignoring!") + return + + self.parameters.to_plot.value = plot_data + self._generate_signal_stats(data_loaded) + + # update signal history (if in locked state) + ( + self.parameters.control_signal_history.value, + self.parameters.monitor_signal_history.value, + ) = update_signal_history( + self.parameters.control_signal_history.value, + self.parameters.monitor_signal_history.value, + data_loaded, + is_locked, + self.parameters.control_signal_history_length.value, + ) + else: + self.parameters.acquisition_raw_data.value = plot_data + + def _start_periodic_timer(self): """ Start a timer that increases the `ping` parameter once per second. Its purpose is to allow for periodic tasks on the server: just register an `on_change` @@ -293,11 +293,13 @@ def __init__(self): super().__init__() self.exposed_is_locked = None + self._connect_acquisition_to_parameters() + def exposed_write_registers(self): pass - def run_acquiry_loop(self): - def run(): + def _connect_acquisition_to_parameters(self): + def write_random_data_to_parameters(): while True: max_ = randint(0, 8191) gen = lambda: np.array([randint(-max_, max_) for _ in range(N_POINTS)]) @@ -311,12 +313,9 @@ def run(): ) sleep(0.1) - t = threading.Thread(target=run) - t.daemon = True - t.start() - - def run_periodic_timer(self): - pass + thread = threading.Thread(target=write_random_data_to_parameters) + thread.daemon = True + thread.start() def exposed_shutdown(self): _thread.interrupt_main() @@ -339,6 +338,31 @@ def continue_acquisition(self): pass +def authenticate_username_and_password(sock): + failed_auth_counter = {"c": 0} + # when a client starts the server, it supplies this hash via an environment + # variable + secret = os.environ.get("LINIEN_AUTH_HASH") + # client always sends auth hash, even if we run in non-auth mode --> always read + # 64 bytes, otherwise rpyc connection can't be established + received = sock.recv(64) + # as a protection against brute force, we don't accept requests after too many + # failed auth requests + if failed_auth_counter["c"] > 1000: + print("received too many failed auth requests!") + sys.exit(1) + + if secret is None: + print("warning: no authentication set up") + else: + if received != secret.encode(): + print("received invalid credentials: ", received) + failed_auth_counter["c"] += 1 + raise AuthenticationError("invalid username / password") + print("authentication successful") + return sock, None + + @click.command() @click.version_option(__version__) @click.argument("port", default=DEFAULT_SERVER_PORT, type=int, required=False) @@ -375,48 +399,13 @@ def run_server(port, fake=False, remote_rp=False): else: control = RedPitayaControlService() - control.run_acquiry_loop() - control.run_periodic_timer() - control.exposed_write_registers() - - failed_auth_counter = {"c": 0} - - def username_and_password_authenticator(sock): - # when a client starts the server, it supplies this hash via an environment - # variable - secret = os.environ.get("LINIEN_AUTH_HASH") - - # client always sends auth hash, even if we run in non-auth mode --> always read - # 64 bytes, otherwise rpyc connection can't be established - received = sock.recv(64) - - # as a protection against brute force, we don't accept requests after too many - # failed auth requests - if failed_auth_counter["c"] > 1000: - print("received too many failed auth requests!") - sys.exit(1) - - if secret is None: - print("warning: no authentication set up") - else: - if received != secret.encode(): - print("received invalid credentials: ", received) - - failed_auth_counter["c"] += 1 - - raise AuthenticationError("invalid username / password") - - print("authentication successful") - - return sock, None - - t = ThreadedServer( + thread = ThreadedServer( control, port=port, - authenticator=username_and_password_authenticator, + authenticator=authenticate_username_and_password, protocol_config={"allow_pickle": True}, ) - t.start() + thread.start() if __name__ == "__main__": From 71232e5d408718d7a396e1d1f0d4203a53ba56ea Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Wed, 29 Mar 2023 10:05:38 +0000 Subject: [PATCH 11/14] fix path to gateware --- linien-server/linien_server/acquisition/controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index c0ef3ae9..68dc7893 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -176,5 +176,5 @@ def start_nginx(): def flash_fpga(): - filepath = Path(__file__).parent / "linien.bin" + filepath = Path(__file__).parents[1] / "linien.bin" shutil.copy(str(filepath.resolve()), "/dev/xdevcfg") From 6cf86a4e04190a2f17c2457ed8b6d463dae74423 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Wed, 29 Mar 2023 12:58:32 +0200 Subject: [PATCH 12/14] fix relative import --- linien-server/linien_server/acquisition/service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linien-server/linien_server/acquisition/service.py b/linien-server/linien_server/acquisition/service.py index be5aab3d..b42ad0c0 100644 --- a/linien-server/linien_server/acquisition/service.py +++ b/linien-server/linien_server/acquisition/service.py @@ -26,12 +26,11 @@ import numpy as np from linien_common.common import DECIMATION, MAX_N_POINTS, N_POINTS from linien_common.config import ACQUISITION_PORT +from linien_server.csr import PythonCSR from pyrp3.board import RedPitaya from rpyc import Service from rpyc.utils.server import OneShotServer -from ..csr import PythonCSR - def shutdown(): _thread.interrupt_main() @@ -279,4 +278,5 @@ def exposed_continue_acquisition(self, uuid): if __name__ == "__main__": t = OneShotServer(AcquisitionService(), port=ACQUISITION_PORT) + print("Starting AcquisitionService on port " + ACQUISITION_PORT) t.start() From fdfe051bed61d6d359d843d7fee513c05d02f5f9 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Wed, 29 Mar 2023 13:03:56 +0200 Subject: [PATCH 13/14] fix print statement --- linien-server/linien_server/acquisition/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linien-server/linien_server/acquisition/service.py b/linien-server/linien_server/acquisition/service.py index b42ad0c0..4cc4aa23 100644 --- a/linien-server/linien_server/acquisition/service.py +++ b/linien-server/linien_server/acquisition/service.py @@ -278,5 +278,5 @@ def exposed_continue_acquisition(self, uuid): if __name__ == "__main__": t = OneShotServer(AcquisitionService(), port=ACQUISITION_PORT) - print("Starting AcquisitionService on port " + ACQUISITION_PORT) + print("Starting AcquisitionService on port " + str(ACQUISITION_PORT)) t.start() From caf303c5e12cc44ad78ed35ce32f5d3edadfbdd4 Mon Sep 17 00:00:00 2001 From: Bastian Leykauf Date: Wed, 29 Mar 2023 13:14:48 +0200 Subject: [PATCH 14/14] move import to fix fake server and remote-rp --- linien-server/linien_server/acquisition/controller.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/linien-server/linien_server/acquisition/controller.py b/linien-server/linien_server/acquisition/controller.py index 68dc7893..4e09a08c 100644 --- a/linien-server/linien_server/acquisition/controller.py +++ b/linien-server/linien_server/acquisition/controller.py @@ -27,7 +27,6 @@ import rpyc from linien_common.config import ACQUISITION_PORT -from linien_server.acquisition.service import AcquisitionService class AcquisitionProcessSignals(Enum): @@ -79,6 +78,8 @@ def connect_acquisition_service(self, pipe, use_ssh, host): acquisition_service = acquisition_rpyc.root else: # This is what happens in production mode + from linien_server.acquisition.service import AcquisitionService + stop_nginx() flash_fpga() acquisition_service = AcquisitionService()