diff --git a/linien-server/linien_server/acquisition/__init__.py b/linien-server/linien_server/acquisition/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/linien-server/linien_server/acquisition.py b/linien-server/linien_server/acquisition/controller.py
similarity index 68%
rename from linien-server/linien_server/acquisition.py
rename to linien-server/linien_server/acquisition/controller.py
index b42c6edc..4e09a08c 100644
--- a/linien-server/linien_server/acquisition.py
+++ b/linien-server/linien_server/acquisition/controller.py
@@ -17,18 +17,16 @@
# along with Linien. If not, see .
import atexit
+import shutil
+import subprocess
import threading
from enum import Enum
from multiprocessing import Pipe, Process
+from pathlib import Path
from time import sleep
import rpyc
from linien_common.config import ACQUISITION_PORT
-from linien_server.utils import flash_fpga, start_nginx, stop_nginx
-
-
-class AcquisitionConnectionError(Exception):
- pass
class AcquisitionProcessSignals(Enum):
@@ -44,50 +42,47 @@ class AcquisitionProcessSignals(Enum):
SET_DUAL_CHANNEL = 9
-class AcquisitionMaster:
+class AcquisitionController:
def __init__(self, use_ssh, host):
self.on_new_data_received = None
- def receive_acquired_data(conn):
- while True:
- is_raw, received_data, data_uuid = conn.recv()
- if self.on_new_data_received is not None:
- self.on_new_data_received(is_raw, received_data, data_uuid)
-
self.parent_conn, child_conn = Pipe()
- process = Process(
- target=self.connect_acquisition_process, args=(child_conn, use_ssh, host)
+ acqusition_service_process = Process(
+ target=self.connect_acquisition_service, args=(child_conn, use_ssh, host)
)
- process.daemon = True
- process.start()
+ acqusition_service_process.daemon = True
+ acqusition_service_process.start()
# wait until connection is established
self.parent_conn.recv()
- thread = threading.Thread(
- target=receive_acquired_data, args=(self.parent_conn,)
+ receive_data_thread = threading.Thread(
+ target=self.receive_acquired_data, args=(self.parent_conn,)
)
- thread.daemon = True
- thread.start()
+ receive_data_thread.daemon = True
+ receive_data_thread.start()
atexit.register(self.shutdown)
- def run_data_acquisition(self, on_new_data_received):
- self.on_new_data_received = on_new_data_received
+ def receive_acquired_data(self, conn):
+ while True:
+ is_raw, received_data, data_uuid = conn.recv()
+ if self.on_new_data_received is not None:
+ self.on_new_data_received(is_raw, received_data, data_uuid)
- def connect_acquisition_process(self, pipe, use_ssh, host):
+ def connect_acquisition_service(self, pipe, use_ssh, host):
if use_ssh:
# for debugging, acquisition process may be launched manually on the server
# and rpyc can be used to connect to it
acquisition_rpyc = rpyc.connect(host, ACQUISITION_PORT)
- acquisition = acquisition_rpyc.root
+ acquisition_service = acquisition_rpyc.root
else:
# This is what happens in production mode
- from linien_server.acquisition_process import DataAcquisitionService
+ from linien_server.acquisition.service import AcquisitionService
stop_nginx()
flash_fpga()
- acquisition = DataAcquisitionService()
+ acquisition_service = AcquisitionService()
# tell the main thread that we're ready
pipe.send(True)
@@ -103,23 +98,23 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
raise SystemExit()
elif data[0] == AcquisitionProcessSignals.SET_SWEEP_SPEED:
speed = data[1]
- acquisition.exposed_set_sweep_speed(speed)
+ acquisition_service.exposed_set_sweep_speed(speed)
elif data[0] == AcquisitionProcessSignals.SET_LOCK_STATUS:
- acquisition.exposed_set_lock_status(data[1])
+ acquisition_service.exposed_set_lock_status(data[1])
elif data[0] == AcquisitionProcessSignals.FETCH_QUADRATURES:
- acquisition.exposed_set_fetch_additional_signals(data[1])
+ acquisition_service.exposed_set_fetch_additional_signals(data[1])
elif data[0] == AcquisitionProcessSignals.SET_RAW_ACQUISITION:
- acquisition.exposed_set_raw_acquisition(data[1])
+ acquisition_service.exposed_set_raw_acquisition(data[1])
elif data[0] == AcquisitionProcessSignals.SET_DUAL_CHANNEL:
- acquisition.exposed_set_dual_channel(data[1])
+ acquisition_service.exposed_set_dual_channel(data[1])
elif data[0] == AcquisitionProcessSignals.SET_CSR:
- acquisition.exposed_set_csr(*data[1])
+ acquisition_service.exposed_set_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.SET_IIR_CSR:
- acquisition.exposed_set_iir_csr(*data[1])
+ acquisition_service.exposed_set_iir_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.PAUSE_ACQUISIITON:
- acquisition.exposed_pause_acquisition()
+ acquisition_service.exposed_pause_acquisition()
elif data[0] == AcquisitionProcessSignals.CONTINUE_ACQUISITION:
- acquisition.exposed_continue_acquisition(data[1])
+ acquisition_service.exposed_continue_acquisition(data[1])
# load acquired data and send it to the main thread
(
@@ -128,27 +123,32 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
data_was_raw,
new_data,
data_uuid,
- ) = acquisition.exposed_return_data(last_hash)
+ ) = acquisition_service.exposed_return_data(last_hash)
if new_data_returned:
last_hash = new_hash
pipe.send((data_was_raw, new_data, data_uuid))
sleep(0.05)
+ def pause_acquisition(self):
+ self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))
+
+ def continue_acquisition(self, uuid):
+ self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))
+
def shutdown(self):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SHUTDOWN,))
-
start_nginx()
def set_sweep_speed(self, speed):
self.parent_conn.send((AcquisitionProcessSignals.SET_SWEEP_SPEED, speed))
- def lock_status_changed(self, status):
+ def set_lock_status(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SET_LOCK_STATUS, status))
- def fetch_additional_signals_changed(self, status):
+ def fetch_additional_signals(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.FETCH_QUADRATURES, status))
@@ -158,18 +158,24 @@ def set_csr(self, key, value):
def set_iir_csr(self, *args):
self.parent_conn.send((AcquisitionProcessSignals.SET_IIR_CSR, args))
- def pause_acquisition(self):
- self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))
-
- def continue_acquisition(self, uuid):
- self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))
-
- def set_raw_acquisition(self, enabled, decimation=None):
- if decimation is None:
- decimation = 0
+ def set_raw_acquisition(self, enabled, decimation=0):
self.parent_conn.send(
(AcquisitionProcessSignals.SET_RAW_ACQUISITION, (enabled, decimation))
)
def set_dual_channel(self, enabled):
self.parent_conn.send((AcquisitionProcessSignals.SET_DUAL_CHANNEL, enabled))
+
+
+def stop_nginx():
+ subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait()
+ subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait()
+
+
+def start_nginx():
+ subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"])
+
+
+def flash_fpga():
+ filepath = Path(__file__).parents[1] / "linien.bin"
+ shutil.copy(str(filepath.resolve()), "/dev/xdevcfg")
diff --git a/linien-server/linien_server/acquisition_process.py b/linien-server/linien_server/acquisition/service.py
similarity index 80%
rename from linien-server/linien_server/acquisition_process.py
rename to linien-server/linien_server/acquisition/service.py
index 7e08b277..4cc4aa23 100644
--- a/linien-server/linien_server/acquisition_process.py
+++ b/linien-server/linien_server/acquisition/service.py
@@ -26,20 +26,21 @@
import numpy as np
from linien_common.common import DECIMATION, MAX_N_POINTS, N_POINTS
from linien_common.config import ACQUISITION_PORT
+from linien_server.csr import PythonCSR
from pyrp3.board import RedPitaya
from rpyc import Service
from rpyc.utils.server import OneShotServer
-from .csr import PythonCSR
-
def shutdown():
_thread.interrupt_main()
os._exit(0)
-class DataAcquisitionService(Service):
+class AcquisitionService(Service):
def __init__(self):
+ super(AcquisitionService, self).__init__()
+
self.red_pitaya = RedPitaya()
self.csr = PythonCSR(self.red_pitaya)
self.csr_queue = []
@@ -50,8 +51,6 @@ def __init__(self):
self.data_hash = None
self.data_uuid = None
- super(DataAcquisitionService, self).__init__()
-
self.locked = False
self.exposed_set_sweep_speed(9)
# when self.locked is set to True, this doesn't mean that the lock is really on.
@@ -73,126 +72,55 @@ def __init__(self):
self.run()
def run(self):
- def run_acquiry_loop():
- while True:
- while self.csr_queue:
- key, value = self.csr_queue.pop(0)
- self.csr.set(key, value)
-
- while self.csr_iir_queue:
- args = self.csr_iir_queue.pop(0)
- self.csr.set_iir(*args)
-
- if self.locked and not self.confirmed_that_in_lock:
- self.confirmed_that_in_lock = self.csr.get(
- "logic_autolock_lock_running"
- )
- if not self.confirmed_that_in_lock:
- sleep(0.05)
- continue
-
- if self.acquisition_paused:
- sleep(0.05)
- continue
-
- # copied from https://github.com/RedPitaya/RedPitaya/blob/14cca62dd58f29826ee89f4b28901602f5cdb1d8/api/src/oscilloscope.c#L115 # noqa: E501
- # check whether scope was triggered
- not_triggered = (self.red_pitaya.scope.read(0x1 << 2) & 0x4) > 0
- if not_triggered:
- sleep(0.05)
- continue
-
- data, is_raw = self.read_data()
-
- if self.acquisition_paused:
- # it may seem strange that we check this here a second time. Reason:
- # `read_data` takes some time and if in the mean time acquisition
- # was paused, we do not want to send the data
- continue
-
- if self.skip_next_data:
- self.skip_next_data = False
- else:
- self.data = pickle.dumps(data)
- self.data_was_raw = is_raw
- self.data_hash = random()
-
- self.program_acquisition_and_rearm()
-
- self.thread = threading.Thread(target=run_acquiry_loop, args=())
+ self.thread = threading.Thread(target=self.acquisition_loop, args=())
self.thread.daemon = True
self.thread.start()
- def program_acquisition_and_rearm(self, trigger_delay=16384):
- """Programs the acquisition settings and rearms acquisition."""
- if not self.locked:
- target_decimation = 2 ** (self.sweep_speed + int(np.log2(DECIMATION)))
-
- self.red_pitaya.scope.data_decimation = target_decimation
- self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1
-
- elif self.raw_acquisition_enabled:
- self.red_pitaya.scope.data_decimation = 2**self.raw_acquisition_decimation
- self.red_pitaya.scope.trigger_delay = trigger_delay
-
- else:
- self.red_pitaya.scope.data_decimation = 1
- self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1
-
- # trigger_source=6 means external trigger positive edge
- self.red_pitaya.scope.rearm(trigger_source=6)
-
- def exposed_return_data(self, last_hash):
- no_data_available = self.data_hash is None
- data_not_changed = self.data_hash == last_hash
- if data_not_changed or no_data_available or self.acquisition_paused:
- return False, None, None, None, None
- else:
- return True, self.data_hash, self.data_was_raw, self.data, self.data_uuid
-
- def exposed_set_sweep_speed(self, speed):
- self.sweep_speed = speed
- # if a slow acqisition is currently running and we change the sweep speed we
- # don't want to wait until it finishes
- self.program_acquisition_and_rearm()
+ def acquisition_loop(self):
+ while True:
+ while self.csr_queue:
+ key, value = self.csr_queue.pop(0)
+ self.csr.set(key, value)
- def exposed_set_lock_status(self, locked):
- self.locked = locked
- self.confirmed_that_in_lock = False
+ while self.csr_iir_queue:
+ args = self.csr_iir_queue.pop(0)
+ self.csr.set_iir(*args)
- def exposed_set_fetch_additional_signals(self, fetch):
- self.fetch_additional_signals = fetch
+ if self.locked and not self.confirmed_that_in_lock:
+ self.confirmed_that_in_lock = self.csr.get(
+ "logic_autolock_lock_running"
+ )
+ if not self.confirmed_that_in_lock:
+ sleep(0.05)
+ continue
- def exposed_set_raw_acquisition(self, data):
- self.raw_acquisition_enabled = data[0]
- self.raw_acquisition_decimation = data[1]
+ if self.acquisition_paused:
+ sleep(0.05)
+ continue
- def exposed_set_dual_channel(self, dual_channel):
- self.dual_channel = dual_channel
+ # copied from https://github.com/RedPitaya/RedPitaya/blob/14cca62dd58f29826ee89f4b28901602f5cdb1d8/api/src/oscilloscope.c#L115 # noqa: E501
+ # check whether scope was triggered
+ not_triggered = (self.red_pitaya.scope.read(0x1 << 2) & 0x4) > 0
+ if not_triggered:
+ sleep(0.05)
+ continue
- def exposed_set_csr(self, key, value):
- self.csr_queue.append((key, value))
+ data, is_raw = self.read_data()
- def exposed_set_iir_csr(self, *args):
- self.csr_iir_queue.append(args)
+ if self.acquisition_paused:
+ # it may seem strange that we check this here a second time. Reason:
+ # `read_data` takes some time and if in the mean time acquisition
+ # was paused, we do not want to send the data
+ continue
- def exposed_pause_acquisition(self):
- self.acquisition_paused = True
- self.data_hash = None
- self.data = None
+ if self.skip_next_data:
+ self.skip_next_data = False
+ else:
+ self.data = pickle.dumps(data)
+ self.data_was_raw = is_raw
+ self.data_hash = random()
- def exposed_continue_acquisition(self, uuid):
- self.program_acquisition_and_rearm()
- sleep(0.01)
- # resetting data here is not strictly required but we want to be on the safe
- # side
- self.data_hash = None
- self.data = None
- self.acquisition_paused = False
- self.data_uuid = uuid
- # if we are sweeping, we have to skip one data set because an incomplete sweep
- # may have been recorded. When locked, this does not matter
- self.skip_next_data = not self.confirmed_that_in_lock
+ self.program_acquisition_and_rearm()
def read_data(self):
write_pointer = self.red_pitaya.scope.write_pointer_trigger
@@ -276,7 +204,79 @@ def read_data_raw(self, offset, addr, data_length):
return signals
+ def program_acquisition_and_rearm(self, trigger_delay=16384):
+ """Programs the acquisition settings and rearms acquisition."""
+ if not self.locked:
+ target_decimation = 2 ** (self.sweep_speed + int(np.log2(DECIMATION)))
+
+ self.red_pitaya.scope.data_decimation = target_decimation
+ self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1
+
+ elif self.raw_acquisition_enabled:
+ self.red_pitaya.scope.data_decimation = 2**self.raw_acquisition_decimation
+ self.red_pitaya.scope.trigger_delay = trigger_delay
+
+ else:
+ self.red_pitaya.scope.data_decimation = 1
+ self.red_pitaya.scope.trigger_delay = int(trigger_delay / DECIMATION) - 1
+
+ # trigger_source=6 means external trigger positive edge
+ self.red_pitaya.scope.rearm(trigger_source=6)
+
+ def exposed_return_data(self, last_hash):
+ no_data_available = self.data_hash is None
+ data_not_changed = self.data_hash == last_hash
+ if data_not_changed or no_data_available or self.acquisition_paused:
+ return False, None, None, None, None
+ else:
+ return True, self.data_hash, self.data_was_raw, self.data, self.data_uuid
+
+ def exposed_set_sweep_speed(self, speed):
+ self.sweep_speed = speed
+ # if a slow acqisition is currently running and we change the sweep speed we
+ # don't want to wait until it finishes
+ self.program_acquisition_and_rearm()
+
+ def exposed_set_lock_status(self, locked):
+ self.locked = locked
+ self.confirmed_that_in_lock = False
+
+ def exposed_set_fetch_additional_signals(self, fetch):
+ self.fetch_additional_signals = fetch
+
+ def exposed_set_raw_acquisition(self, data):
+ self.raw_acquisition_enabled = data[0]
+ self.raw_acquisition_decimation = data[1]
+
+ def exposed_set_dual_channel(self, dual_channel):
+ self.dual_channel = dual_channel
+
+ def exposed_set_csr(self, key, value):
+ self.csr_queue.append((key, value))
+
+ def exposed_set_iir_csr(self, *args):
+ self.csr_iir_queue.append(args)
+
+ def exposed_pause_acquisition(self):
+ self.acquisition_paused = True
+ self.data_hash = None
+ self.data = None
+
+ def exposed_continue_acquisition(self, uuid):
+ self.program_acquisition_and_rearm()
+ sleep(0.01)
+ # resetting data here is not strictly required but we want to be on the safe
+ # side
+ self.data_hash = None
+ self.data = None
+ self.acquisition_paused = False
+ self.data_uuid = uuid
+ # if we are sweeping, we have to skip one data set because an incomplete sweep
+ # may have been recorded. When locked, this does not matter
+ self.skip_next_data = not self.confirmed_that_in_lock
+
if __name__ == "__main__":
- t = OneShotServer(DataAcquisitionService(), port=ACQUISITION_PORT)
+ t = OneShotServer(AcquisitionService(), port=ACQUISITION_PORT)
+ print("Starting AcquisitionService on port " + str(ACQUISITION_PORT))
t.start()
diff --git a/linien-server/linien_server/autolock/robust.py b/linien-server/linien_server/autolock/robust.py
index 73551d48..0140ebf7 100644
--- a/linien-server/linien_server/autolock/robust.py
+++ b/linien-server/linien_server/autolock/robust.py
@@ -34,7 +34,6 @@
sign,
sum_up_spectrum,
)
-from linien_server.utils import sweep_speed_to_time
class LockPositionNotFound(Exception):
@@ -275,3 +274,12 @@ def get_lock_position_from_autolock_instructions(
return idx + final_wait_time
raise LockPositionNotFound()
+
+
+def sweep_speed_to_time(sweep_speed):
+ """Sweep speed is an arbitrary unit (cf. `parameters.py`).
+ This function converts it to the duration of the sweep in seconds.
+ """
+ f_real = 3.8e3 / (2**sweep_speed)
+ duration = 1 / f_real
+ return duration
diff --git a/linien-server/linien_server/approach_line.py b/linien-server/linien_server/optimization/approach_line.py
similarity index 100%
rename from linien-server/linien_server/approach_line.py
rename to linien-server/linien_server/optimization/approach_line.py
diff --git a/linien-server/linien_server/optimization/optimization.py b/linien-server/linien_server/optimization/optimization.py
index a2912fac..2938201d 100644
--- a/linien-server/linien_server/optimization/optimization.py
+++ b/linien-server/linien_server/optimization/optimization.py
@@ -21,9 +21,10 @@
import numpy as np
from linien_common.common import determine_shift_by_correlation, get_lock_point
-from linien_server.approach_line import Approacher
-from linien_server.optimization.engine import OptimizerEngine
-from linien_server.optimization.utils import FINAL_ZOOM_FACTOR
+
+from .approach_line import Approacher
+from .engine import OptimizerEngine
+from .utils import FINAL_ZOOM_FACTOR
class OptimizeSpectroscopy:
diff --git a/linien-server/linien_server/parameter_store.py b/linien-server/linien_server/parameter_store.py
deleted file mode 100644
index 21012164..00000000
--- a/linien-server/linien_server/parameter_store.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright 2018-2022 Benjamin Wiegand
-# Copyright 2021-2022 Bastian Leykauf
-#
-# This file is part of Linien and based on redpid.
-#
-# Linien is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Linien is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linien. If not, see .
-
-import atexit
-import pickle
-from time import time
-
-import linien_server
-
-PARAMETER_STORE_FN = "/linien_parameters.pickle"
-
-
-class ParameterStore:
- """This class installs an `atexit` listener that persists parameters to disk
- when the server shuts down. Once it restarts the parameters are restored."""
-
- def __init__(self, parameters):
- self.parameters = parameters
- self.restore_parameters()
- self.setup_listener()
-
- def setup_listener(self):
- """Listen for shutdown"""
- atexit.register(self.save_parameters)
-
- def restore_parameters(self):
- """When the server starts, this method restores previously saved
- parameters (if any)."""
- try:
- with open(PARAMETER_STORE_FN, "rb") as f:
- data = pickle.load(f)
- except (FileNotFoundError, pickle.UnpicklingError, EOFError):
- return
-
- print("restore parameters")
-
- for param_name, value in data["parameters"].items():
- try:
- getattr(self.parameters, param_name).value = value
- except AttributeError:
- # ignore parameters that don't exist (anymore)
- continue
-
- def save_parameters(self):
- """Gather all parameters and store them on disk."""
- print("save parameters")
- parameters = {}
-
- for param_name in self.parameters._restorable_parameters:
- param = getattr(self.parameters, param_name)
- parameters[param_name] = param.value
-
- try:
- with open(PARAMETER_STORE_FN, "wb") as f:
- pickle.dump(
- {
- "parameters": parameters,
- "time": time(),
- "version": linien_server.__version__,
- },
- f,
- )
- except PermissionError:
- # this may happen if the server doesn't run on RedPitaya but on the
- # developer's machine. As it is not a critical problem, just print
- # the exception and ignore it
- from traceback import print_exc
-
- print_exc()
diff --git a/linien-server/linien_server/parameters.py b/linien-server/linien_server/parameters.py
index 89449f91..f8dbfc08 100644
--- a/linien-server/linien_server/parameters.py
+++ b/linien-server/linien_server/parameters.py
@@ -17,18 +17,81 @@
# You should have received a copy of the GNU General Public License
# along with Linien. If not, see .
+import atexit
+import pickle
+from time import time
+
+import linien_server
from linien_common.common import (
AUTO_DETECT_AUTOLOCK_MODE,
FAST_AUTOLOCK,
PSD_ALGORITHM_LPSD,
MHz,
Vpp,
+ pack,
)
from linien_common.config import DEFAULT_COLORS, N_COLORS
-from linien_server.parameters_base import BaseParameters, Parameter
-class Parameters(BaseParameters):
+class Parameter:
+ """Represents a single parameter and is used by `Parameters`."""
+
+ def __init__(
+ self,
+ min_=None,
+ max_=None,
+ start=None,
+ wrap=False,
+ sync=True,
+ collapsed_sync=True,
+ ):
+ self.min = min_
+ self.max = max_
+ self.wrap = wrap
+ self._value = start
+ self._start = start
+ self._listeners = set()
+ self.exposed_can_be_cached = sync
+ self._collapsed_sync = collapsed_sync
+
+ @property
+ def value(self):
+ return self._value
+
+ @value.setter
+ def value(self, value):
+ # check bounds
+ if self.min is not None and value < self.min:
+ value = self.min if not self.wrap else self.max
+ if self.max is not None and value > self.max:
+ value = self.max if not self.wrap else self.min
+
+ self._value = value
+
+ # we copy it because a listener could remove a listener --> this would
+ # cause an error in this loop
+ for listener in self._listeners.copy():
+ listener(value)
+
+ def on_change(self, function, call_listener_with_first_value=True):
+ self._listeners.add(function)
+
+ if call_listener_with_first_value:
+ if self._value is not None:
+ function(self._value)
+
+ def register_remote_listener(self, remote_uuid):
+ pass
+
+ def remove_listener(self, function):
+ if function in self._listeners:
+ self._listeners.remove(function)
+
+ def exposed_reset(self):
+ self.value = self._start
+
+
+class Parameters:
"""
This class defines the parameters of the Linien server. They represent the public
interface and can be used to control the behavior of the server.
@@ -52,7 +115,8 @@ class Parameters(BaseParameters):
"""
def __init__(self):
- super().__init__()
+ self._remote_listener_queue = {}
+ self._remote_listener_callbacks = {}
# parameters whose values are saved by the client and restored if the client
# connects to the RedPitaya with no server running:
@@ -165,7 +229,7 @@ def __init__(self):
self.control_channel = Parameter(start=1, min_=0, max_=1)
"""
- Configures the output of the lock signal. A value of 0 means FAST OUT 1 and a
+ Configures the output of the lock signal. A value of 0 means FAST OUT 1 and a
value of 1 corresponds to FAST OUT 2
"""
@@ -174,7 +238,7 @@ def __init__(self):
Configures the output of the slow PID control:
0 --> FAST OUT 1
1 --> FAST OUT 2
- 2 --> ANALOG OUT 0 (slow channel)
+ 2 --> ANALOG OUT 0 (slow channel)
"""
self.gpio_p_out = Parameter(start=0, min_=0, max_=0b11111111)
@@ -328,7 +392,7 @@ def __init__(self):
corresponds to equal ratio
-128 only channel A being active
128 only channel B being active
- Integer values [-128, ..., 128] are allowed.
+ Integer values [-128, ..., 128] are allowed.
"""
# The following parameters exist twice, i.e. once per channel
@@ -370,7 +434,7 @@ def __init__(self):
automatically determine suitable filter for a given modulation frequency or
whether the user may configure the filters himself. If automatic mode is
enabled, two low pass filters are installed with a frequency of half the
- modulation frequency.
+ modulation frequency.
"""
for filter_i in (1, 2):
@@ -406,7 +470,7 @@ def __init__(self):
"""
After combining channels A and B and before passing the result to the PID,
`combined_offset` is added. It uses the same units as the channel offsets, i.e.
- a value of -8191 shifts the data down by 1V, a value of +8191 moves it up.
+ a value of -8191 shifts the data down by 1V, a value of +8191 moves it up.
"""
self.p = Parameter(start=50, max_=8191)
@@ -515,3 +579,126 @@ def __init__(self):
"plot_color_%d" % color_idx,
Parameter(start=DEFAULT_COLORS[color_idx]),
)
+
+ def __iter__(self):
+ for name, param in self.get_all_parameters():
+ yield name, param.value
+
+ def get_all_parameters(self):
+ for name, element in self.__dict__.items():
+ if isinstance(element, Parameter):
+ yield name, element
+
+ def init_parameter_sync(self, uuid):
+ """
+ To be called by a remote client: Yields all parameters as well as their values
+ and if the parameters are suited to be cached registers a listener that pushes
+ changes of these parameters to the client.
+ """
+ for name, element in self.get_all_parameters():
+ yield name, element, element.value, element.exposed_can_be_cached
+ if element.exposed_can_be_cached:
+ self.register_remote_listener(uuid, name)
+
+ def register_remote_listener(self, uuid, param_name):
+ self._remote_listener_queue.setdefault(uuid, [])
+ self._remote_listener_callbacks.setdefault(uuid, [])
+
+ def on_change(value, uuid=uuid, param_name=param_name):
+ if uuid in self._remote_listener_queue:
+ self._remote_listener_queue[uuid].append((param_name, value))
+
+ param = getattr(self, param_name)
+ param.on_change(on_change)
+
+ self._remote_listener_callbacks[uuid].append((param, on_change))
+
+ def unregister_remote_listeners(self, uuid):
+ for param, callback in self._remote_listener_callbacks[uuid]:
+ param.remove_listener(callback)
+
+ del self._remote_listener_queue[uuid]
+ del self._remote_listener_callbacks[uuid]
+
+ def get_listener_queue(self, uuid):
+ queue = self._remote_listener_queue.get(uuid, [])
+ self._remote_listener_queue[uuid] = []
+
+ # filter out multiple values for collapsible parameters
+ already_has_value = []
+ for idx in reversed(range(len(queue))):
+ param_name, value = queue[idx]
+ if self._get_param(param_name)._collapsed_sync:
+ if param_name in already_has_value:
+ del queue[idx]
+ else:
+ already_has_value.append(param_name)
+
+ return pack(queue)
+
+ def _get_param(self, param_name):
+ param = getattr(self, param_name)
+ assert isinstance(param, Parameter)
+ return param
+
+
+PARAMETER_STORE_FN = "/linien_parameters.pickle"
+
+
+class ParameterStore:
+ """This class installs an `atexit` listener that persists parameters to disk
+ when the server shuts down. Once it restarts the parameters are restored."""
+
+ def __init__(self, parameters):
+ self.parameters = parameters
+ self.restore_parameters()
+ self.setup_listener()
+
+ def setup_listener(self):
+ """Listen for shutdown"""
+ atexit.register(self.save_parameters)
+
+ def restore_parameters(self):
+ """When the server starts, this method restores previously saved
+ parameters (if any)."""
+ try:
+ with open(PARAMETER_STORE_FN, "rb") as f:
+ data = pickle.load(f)
+ except (FileNotFoundError, pickle.UnpicklingError, EOFError):
+ return
+
+ print("restore parameters")
+
+ for param_name, value in data["parameters"].items():
+ try:
+ getattr(self.parameters, param_name).value = value
+ except AttributeError:
+ # ignore parameters that don't exist (anymore)
+ continue
+
+ def save_parameters(self):
+ """Gather all parameters and store them on disk."""
+ print("save parameters")
+ parameters = {}
+
+ for param_name in self.parameters._restorable_parameters:
+ param = getattr(self.parameters, param_name)
+ parameters[param_name] = param.value
+
+ try:
+ with open(PARAMETER_STORE_FN, "wb") as f:
+ pickle.dump(
+ {
+ "parameters": parameters,
+ "time": time(),
+ "version": linien_server.__version__,
+ },
+ f,
+ )
+ except PermissionError:
+ # this may happen if the server doesn't run on RedPitaya but on the
+ # developer's machine. As it is not a critical problem, just print
+ # the exception and ignore it
+ from traceback import print_exc
+
+ print_exc()
diff --git a/linien-server/linien_server/parameters_base.py b/linien-server/linien_server/parameters_base.py
deleted file mode 100644
index a7bef3e8..00000000
--- a/linien-server/linien_server/parameters_base.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Copyright 2018-2022 Benjamin Wiegand
-#
-# This file is part of Linien and based on redpid.
-#
-# Linien is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Linien is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linien. If not, see .
-
-from linien_common.common import pack
-
-
-class Parameter:
- """Represents a single parameter and is used by `Parameters`."""
-
- def __init__(
- self,
- min_=None,
- max_=None,
- start=None,
- wrap=False,
- sync=True,
- collapsed_sync=True,
- ):
- self.min = min_
- self.max = max_
- self.wrap = wrap
- self._value = start
- self._start = start
- self._listeners = set()
- self._collapsed_sync = collapsed_sync
- self.exposed_can_be_cached = sync
-
- @property
- def value(self):
- return self._value
-
- @value.setter
- def value(self, value):
- # check bounds
- if self.min is not None and value < self.min:
- value = self.min if not self.wrap else self.max
- if self.max is not None and value > self.max:
- value = self.max if not self.wrap else self.min
-
- self._value = value
-
- # we copy it because a listener could remove a listener --> this would
- # cause an error in this loop
- for listener in self._listeners.copy():
- listener(value)
-
- def on_change(self, function, call_listener_with_first_value=True):
- self._listeners.add(function)
-
- if call_listener_with_first_value:
- if self._value is not None:
- function(self._value)
-
- def remove_listener(self, function):
- if function in self._listeners:
- self._listeners.remove(function)
-
- def exposed_reset(self):
- self.value = self._start
-
- def register_remote_listener(self, remote_uuid):
- pass
-
-
-class BaseParameters:
- """Represents a set of parameters. In an actual program, it should be
- sub-classed like this:
-
- class MyParameters(BaseParameters):
- def __init__(self):
- self.param1 = Parameter(min_=12, max_=24)
-
- Parameters can be changed like this:
-
- p = MyParameters(...)
- p.param1.value = 123
-
- You can register callback functions like this:
-
- def on_change(value):
- # do something
-
- p.param1.on_change(on_change)
- """
-
- def __init__(self):
- self._remote_listener_queue = {}
- self._remote_listener_callbacks = {}
-
- def get_all_parameters(self):
- for name, element in self.__dict__.items():
- if isinstance(element, Parameter):
- yield name, element
-
- def init_parameter_sync(self, uuid):
- """To be called by a remote client: Yields all parameters as well
- as their values and if the parameters are suited to be cached registers
- a listener that pushes changes of these parameters to the client."""
- for name, element in self.get_all_parameters():
- yield name, element, element.value, element.exposed_can_be_cached
- if element.exposed_can_be_cached:
- self.register_remote_listener(uuid, name)
-
- def register_remote_listener(self, uuid, param_name):
- self._remote_listener_queue.setdefault(uuid, [])
- self._remote_listener_callbacks.setdefault(uuid, [])
-
- def on_change(value, uuid=uuid, param_name=param_name):
- if uuid in self._remote_listener_queue:
- self._remote_listener_queue[uuid].append((param_name, value))
-
- param = getattr(self, param_name)
- param.on_change(on_change)
-
- self._remote_listener_callbacks[uuid].append((param, on_change))
-
- def unregister_remote_listeners(self, uuid):
- for param, callback in self._remote_listener_callbacks[uuid]:
- param.remove_listener(callback)
-
- del self._remote_listener_queue[uuid]
- del self._remote_listener_callbacks[uuid]
-
- def get_listener_queue(self, uuid):
- queue = self._remote_listener_queue.get(uuid, [])
- self._remote_listener_queue[uuid] = []
-
- # filter out multiple values for collapsible parameters
- already_has_value = []
- for idx in reversed(range(len(queue))):
- param_name, value = queue[idx]
- if self._get_param(param_name)._collapsed_sync:
- if param_name in already_has_value:
- del queue[idx]
- else:
- already_has_value.append(param_name)
-
- return pack(queue)
-
- def __iter__(self):
- for name, param in self.get_all_parameters():
- yield name, param.value
-
- def _get_param(self, param_name):
- param = getattr(self, param_name)
- assert isinstance(param, Parameter)
- return param
diff --git a/linien-server/linien_server/registers.py b/linien-server/linien_server/registers.py
index 0e3ea7fb..1961b60a 100644
--- a/linien-server/linien_server/registers.py
+++ b/linien-server/linien_server/registers.py
@@ -26,10 +26,9 @@
)
from linien_common.config import DEFAULT_SWEEP_SPEED
-from .acquisition import AcquisitionMaster
+from .acquisition.controller import AcquisitionController
from .csr import PitayaCSR
from .iir_coeffs import make_filter
-from .utils import twos_complement
class Registers:
@@ -44,7 +43,7 @@ def __init__(self, host=None, user=None, password=None):
self.host = host
self.user = user
self.password = password
- self.acquisition = None
+ self.acquisition_controller = None
self._last_sweep_speed = None
self._last_raw_acquisition_settings = None
@@ -57,28 +56,28 @@ def connect(self, control, parameters):
self.csr = PitayaCSR()
- def lock_status_changed(v):
- if self.acquisition is not None:
- self.acquisition.lock_status_changed(v)
+ def on_lock_status_changed(v):
+ if self.acquisition_controller is not None:
+ self.acquisition_controller.set_lock_status(v)
- self.parameters.lock.on_change(lock_status_changed)
+ self.parameters.lock.on_change(on_lock_status_changed)
- def fetch_additional_signals_changed(v):
- if self.acquisition is not None:
- self.acquisition.fetch_additional_signals_changed(v)
+ def on_fetch_additional_signals_changed(v):
+ if self.acquisition_controller is not None:
+ self.acquisition_controller.fetch_additional_signals(v)
self.parameters.fetch_additional_signals.on_change(
- fetch_additional_signals_changed
+ on_fetch_additional_signals_changed
)
- def dual_channel_changed(dual_channel):
- if self.acquisition is not None:
- self.acquisition.set_dual_channel(dual_channel)
+ def on_dual_channel_changed(dual_channel):
+ if self.acquisition_controller is not None:
+ self.acquisition_controller.set_dual_channel(dual_channel)
- self.parameters.dual_channel.on_change(dual_channel_changed)
+ self.parameters.dual_channel.on_change(on_dual_channel_changed)
use_ssh = self.host is not None and self.host not in ("localhost", "127.0.0.1")
- self.acquisition = AcquisitionMaster(use_ssh, self.host)
+ self.acquisition_controller = AcquisitionController(use_ssh, self.host)
def write_registers(self):
"""Writes data from `parameters` to the FPGA."""
@@ -99,8 +98,8 @@ def write_registers(self):
self.control.exposed_is_locked = lock
new = dict(
- # sweep run is 1 by default. The gateware automatically takes care
- # of stopping the sweep run after `request_lock` is set by setting
+ # sweep run is 1 by default. The gateware automatically takes care of
+ # stopping the sweep run after `request_lock` is set by setting
# `sweep.clear`
logic_sweep_run=1,
logic_sweep_pause=int(params["sweep_pause"]),
@@ -187,8 +186,8 @@ def write_registers(self):
}
)
else:
- # display both demodulated error signals (if dual channel mode)
- # OR: display demodulated error signal 1 + monitor signal
+ # display both demodulated error signals (if dual channel mode) OR: display
+ # demodulated error signal 1 + monitor signal
new.update(
{
"scopegen_adc_a_sel": self.csr.signal("fast_a_out_i"),
@@ -217,7 +216,7 @@ def write_registers(self):
sweep_changed = params["sweep_speed"] != self._last_sweep_speed
if sweep_changed:
self._last_sweep_speed = params["sweep_speed"]
- self.acquisition.set_sweep_speed(params["sweep_speed"])
+ self.acquisition_controller.set_sweep_speed(params["sweep_speed"])
raw_acquisition_settings = (
params["acquisition_raw_enabled"],
@@ -225,7 +224,7 @@ def write_registers(self):
)
if raw_acquisition_settings != self._last_raw_acquisition_settings:
self._last_raw_acquisition_settings = raw_acquisition_settings
- self.acquisition.set_raw_acquisition(*raw_acquisition_settings)
+ self.acquisition_controller.set_raw_acquisition(*raw_acquisition_settings)
fpga_base_freq = 125e6
@@ -240,9 +239,9 @@ def write_registers(self):
self.set(k, int(v))
if not lock and sweep_changed:
- # reset sweep for a short time if the scan range was changed
- # this is needed because otherwise it may take too long before
- # the new scan range is reached --> no scope trigger is sent
+ # reset sweep for a short time if the scan range was changed this is needed
+ # because otherwise it may take too long before the new scan range is
+ # reached --> no scope trigger is sent
self.set("logic_sweep_run", 0)
self.set("logic_sweep_run", 1)
@@ -297,10 +296,10 @@ def channel_polarity(channel):
)
# if the filter frequency is too low (< 10Hz), the IIR doesn't
- # work properly anymore. In that case, don't filter.
- # This is also helpful if the raw (not demodulated) signal
- # should be displayed which can be achieved by setting
- # modulation frequency to 0.
+ # work properly anymore. In that case, don't filter. This is
+ # also helpful if the raw (not demodulated) signal should be
+ # displayed which can be achieved by setting modulation
+ # frequency to 0.
if filter_frequency < 10:
filter_enabled = False
else:
@@ -368,11 +367,19 @@ def set_slow_pid(self, strength, slope, reset=None):
self.set("slow_chain_pid_reset", reset)
def set(self, key, value):
- self.acquisition.set_csr(key, value)
+ self.acquisition_controller.set_csr(key, value)
def set_iir(self, iir_name, *args):
if self._iir_cache.get(iir_name) != args:
- # as setting iir parameters takes some time, take care that we don't
- # do it too often
- self.acquisition.set_iir_csr(iir_name, *args)
+ # as setting iir parameters takes some time, take care that we don't do it
+ # too often
+ self.acquisition_controller.set_iir_csr(iir_name, *args)
self._iir_cache[iir_name] = args
+
+
+def twos_complement(num, N_bits):
+ max_ = 1 << (N_bits - 1)
+ full = 2 * max_
+ if num < 0:
+ num += full
+ return num
diff --git a/linien-server/linien_server/server.py b/linien-server/linien_server/server.py
index 0868e8eb..faa31f6d 100644
--- a/linien-server/linien_server/server.py
+++ b/linien-server/linien_server/server.py
@@ -21,7 +21,7 @@
import pickle
import sys
import threading
-from random import random
+from random import randint, random
from time import sleep
import click
@@ -38,12 +38,12 @@
from linien_server import __version__
from linien_server.autolock.autolock import Autolock
from linien_server.optimization.optimization import OptimizeSpectroscopy
-from linien_server.parameter_store import ParameterStore
-from linien_server.parameters import Parameters
+from linien_server.parameters import Parameters, ParameterStore
from linien_server.pid_optimization.pid_optimization import (
PIDOptimization,
PSDAcquisition,
)
+from linien_server.registers import Registers
from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.server import ThreadedServer
@@ -66,6 +66,9 @@ def on_disconnect(self, client):
uuid = self._uuid_mapping[client]
self.parameters.unregister_remote_listeners(uuid)
+ def exposed_get_server_version(self):
+ return __version__
+
def exposed_get_param(self, param_name):
return pack(self.parameters._get_param(param_name).value)
@@ -93,59 +96,61 @@ def __init__(self, **kwargs):
self._cached_data = {}
self.exposed_is_locked = None
- super().__init__()
-
- from linien_server.registers import Registers
+ super(RedPitayaControlService, self).__init__()
self.registers = Registers(**kwargs)
self.registers.connect(self, self.parameters)
+ self._connect_acquisition_to_parameters()
+ self._start_periodic_timer()
+ self.exposed_write_registers()
- def run_acquiry_loop(self):
- """Starts a background process that keeps polling control and error
- signal. Every received value is pushed to `parameters.to_plot`."""
-
- def on_new_data_received(is_raw, plot_data, data_uuid):
- # When a parameter is changed, `pause_acquisition` is set.
- # This means that the we should skip new data until we are sure that
- # it was recorded with the new settings.
- if not self.parameters.pause_acquisition.value:
- if data_uuid != self.data_uuid:
- return
-
- data_loaded = pickle.loads(plot_data)
-
- if not is_raw:
- is_locked = self.parameters.lock.value
-
- if not check_plot_data(is_locked, data_loaded):
- print(
- "warning: incorrect data received for lock state, ignoring!"
- )
- return
-
- self.parameters.to_plot.value = plot_data
- self._generate_signal_stats(data_loaded)
-
- # update signal history (if in locked state)
- (
- self.parameters.control_signal_history.value,
- self.parameters.monitor_signal_history.value,
- ) = update_signal_history(
- self.parameters.control_signal_history.value,
- self.parameters.monitor_signal_history.value,
- data_loaded,
- is_locked,
- self.parameters.control_signal_history_length.value,
- )
- else:
- self.parameters.acquisition_raw_data.value = plot_data
-
+ def _connect_acquisition_to_parameters(self):
+ """
+ Connect the acquisition loopo to the parameters: Every received value is pushed
+ to `parameters.to_plot`.
+ """
# each time new data is acquired, this function is called
- self.registers.acquisition.on_new_data_received = on_new_data_received
+ self.registers.acquisition_controller.on_new_data_received = (
+ self._on_new_data_received
+ )
self.pause_acquisition()
self.continue_acquisition()
- def run_periodic_timer(self):
+ def _on_new_data_received(self, is_raw, plot_data, data_uuid):
+ # When a parameter is changed, `pause_acquisition` is set. This means that
+ # the we should skip new data until we are sure that it was recorded with
+ # the new settings.
+ if not self.parameters.pause_acquisition.value:
+ if data_uuid != self.data_uuid:
+ return
+
+ data_loaded = pickle.loads(plot_data)
+
+ if not is_raw:
+ is_locked = self.parameters.lock.value
+
+ if not check_plot_data(is_locked, data_loaded):
+ print("warning: incorrect data received for lock state, ignoring!")
+ return
+
+ self.parameters.to_plot.value = plot_data
+ self._generate_signal_stats(data_loaded)
+
+ # update signal history (if in locked state)
+ (
+ self.parameters.control_signal_history.value,
+ self.parameters.monitor_signal_history.value,
+ ) = update_signal_history(
+ self.parameters.control_signal_history.value,
+ self.parameters.monitor_signal_history.value,
+ data_loaded,
+ is_locked,
+ self.parameters.control_signal_history_length.value,
+ )
+ else:
+ self.parameters.acquisition_raw_data.value = plot_data
+
+ def _start_periodic_timer(self):
"""
Start a timer that increases the `ping` parameter once per second. Its purpose
is to allow for periodic tasks on the server: just register an `on_change`
@@ -240,18 +245,12 @@ def exposed_start_lock(self):
self.continue_acquisition()
def exposed_shutdown(self):
- """Kills the server."""
- self.registers.acquisition.shutdown()
+ """Kill the server."""
+ self.registers.acquisition_controller.shutdown()
_thread.interrupt_main()
- # we use SystemExit instead of os._exit because we want to call atexit
- # handlers
+ # we use SystemExit instead of os._exit because we want to call atexit handlers
raise SystemExit
- def exposed_get_server_version(self):
- import linien_server
-
- return linien_server.__version__
-
def exposed_get_restorable_parameters(self):
return self.parameters._restorable_parameters
@@ -262,41 +261,45 @@ def exposed_continue_acquisition(self):
self.continue_acquisition()
def exposed_set_csr_direct(self, k, v):
- """Directly sets a CSR register. This method is intended for debugging.
- Normally, the FPGA should be controlled via manipulation of parameters."""
+ """
+ Directly sets a CSR register. This method is intended for debugging. Normally,
+ the FPGA should be controlled via manipulation of parameters.
+ """
self.registers.set(k, v)
def pause_acquisition(self):
- """Pause continuous acquisition. Call this before changing a parameter
- that alters the error / control signal. This way, no inconsistent signals
- reach the application. After setting the new parameter values, call
- `continue_acquisition`."""
+ """
+ Pause continuous acquisition. Call this before changing a parameter that alters
+ the error / control signal. This way, no inconsistent signals reach the
+ application. After setting the new parameter values, call
+ `continue_acquisition`.
+ """
self.parameters.pause_acquisition.value = True
self.data_uuid = random()
- self.registers.acquisition.pause_acquisition()
+ self.registers.acquisition_controller.pause_acquisition()
def continue_acquisition(self):
- """Continue acquisition after a short delay, when we are sure that the
- new parameters values have been written to the FPGA and that data that
- is now recorded is recorded with the correct parameters."""
+ """
+ Continue acquisition after a short delay, when we are sure that the new
+ parameters values have been written to the FPGA and that data that is now
+ recorded is recorded with the correct parameters.
+ """
self.parameters.pause_acquisition.value = False
- self.registers.acquisition.continue_acquisition(self.data_uuid)
+ self.registers.acquisition_controller.continue_acquisition(self.data_uuid)
-class FakeRedPitayaControl(BaseService):
+class FakeRedPitayaControlService(BaseService):
def __init__(self):
super().__init__()
self.exposed_is_locked = None
+ self._connect_acquisition_to_parameters()
+
def exposed_write_registers(self):
pass
- def run_acquiry_loop(self):
- import threading
- from random import randint
- from time import sleep
-
- def run():
+ def _connect_acquisition_to_parameters(self):
+ def write_random_data_to_parameters():
while True:
max_ = randint(0, 8191)
gen = lambda: np.array([randint(-max_, max_) for _ in range(N_POINTS)])
@@ -310,12 +313,9 @@ def run():
)
sleep(0.1)
- t = threading.Thread(target=run)
- t.daemon = True
- t.start()
-
- def run_periodic_timer(self):
- pass
+ thread = threading.Thread(target=write_random_data_to_parameters)
+ thread.daemon = True
+ thread.start()
def exposed_shutdown(self):
_thread.interrupt_main()
@@ -331,11 +331,6 @@ def exposed_start_optimization(self, x0, x1, spectrum):
def exposed_get_restorable_parameters(self):
return self.parameters._restorable_parameters
- def exposed_get_server_version(self):
- import linien_server
-
- return linien_server.__version__
-
def pause_acquisition(self):
pass
@@ -343,6 +338,31 @@ def continue_acquisition(self):
pass
+def authenticate_username_and_password(sock):
+ failed_auth_counter = {"c": 0}
+ # when a client starts the server, it supplies this hash via an environment
+ # variable
+ secret = os.environ.get("LINIEN_AUTH_HASH")
+ # client always sends auth hash, even if we run in non-auth mode --> always read
+ # 64 bytes, otherwise rpyc connection can't be established
+ received = sock.recv(64)
+ # as a protection against brute force, we don't accept requests after too many
+ # failed auth requests
+ if failed_auth_counter["c"] > 1000:
+ print("received too many failed auth requests!")
+ sys.exit(1)
+
+ if secret is None:
+ print("warning: no authentication set up")
+ else:
+ if received != secret.encode():
+ print("received invalid credentials: ", received)
+ failed_auth_counter["c"] += 1
+ raise AuthenticationError("invalid username / password")
+ print("authentication successful")
+ return sock, None
+
+
@click.command()
@click.version_option(__version__)
@click.argument("port", default=DEFAULT_SERVER_PORT, type=int, required=False)
@@ -362,7 +382,7 @@ def run_server(port, fake=False, remote_rp=False):
if fake:
print("starting fake server")
- control = FakeRedPitayaControl()
+ control = FakeRedPitayaControlService()
else:
if remote_rp is not None:
assert (
@@ -379,48 +399,13 @@ def run_server(port, fake=False, remote_rp=False):
else:
control = RedPitayaControlService()
- control.run_acquiry_loop()
- control.run_periodic_timer()
- control.exposed_write_registers()
-
- failed_auth_counter = {"c": 0}
-
- def username_and_password_authenticator(sock):
- # when a client starts the server, it supplies this hash via an environment
- # variable
- secret = os.environ.get("LINIEN_AUTH_HASH")
-
- # client always sends auth hash, even if we run in non-auth mode
- # --> always read 64 bytes, otherwise rpyc connection can't be established
- received = sock.recv(64)
-
- # as a protection against brute force, we don't accept requests after
- # too many failed auth requests
- if failed_auth_counter["c"] > 1000:
- print("received too many failed auth requests!")
- sys.exit(1)
-
- if secret is None:
- print("warning: no authentication set up")
- else:
- if received != secret.encode():
- print("received invalid credentials: ", received)
-
- failed_auth_counter["c"] += 1
-
- raise AuthenticationError("invalid username / password")
-
- print("authentication successful")
-
- return sock, None
-
- t = ThreadedServer(
+ thread = ThreadedServer(
control,
port=port,
- authenticator=username_and_password_authenticator,
+ authenticator=authenticate_username_and_password,
protocol_config={"allow_pickle": True},
)
- t.start()
+ thread.start()
if __name__ == "__main__":
diff --git a/linien-server/linien_server/utils.py b/linien-server/linien_server/utils.py
deleted file mode 100644
index 38c20c5c..00000000
--- a/linien-server/linien_server/utils.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2018-2022 Benjamin Wiegand
-# Copyright 2021-2022 Bastian Leykauf
-#
-# This file is part of Linien and based on redpid.
-#
-# Linien is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Linien is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linien. If not, see .
-
-import shutil
-import subprocess
-from pathlib import Path
-
-
-def stop_nginx():
- subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait()
- subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait()
-
-
-def start_nginx():
- subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"])
-
-
-def flash_fpga():
- filepath = Path(__file__).parent / "linien.bin"
- shutil.copy(str(filepath.resolve()), "/dev/xdevcfg")
-
-
-def twos_complement(num, N_bits):
- max_ = 1 << (N_bits - 1)
- full = 2 * max_
-
- if num < 0:
- num += full
-
- return num
-
-
-def sweep_speed_to_time(sweep_speed):
- """Sweep speed is an arbitrary unit (cf. `parameters.py`).
- This function converts it to the duration of the sweep in seconds.
- """
- f_real = 3.8e3 / (2**sweep_speed)
- duration = 1 / f_real
- return duration
diff --git a/tests/test_approacher.py b/tests/test_approacher.py
index ede173c8..2e506f49 100644
--- a/tests/test_approacher.py
+++ b/tests/test_approacher.py
@@ -18,7 +18,7 @@
import numpy as np
from linien_common.common import get_lock_point
-from linien_server.approach_line import Approacher
+from linien_server.optimization.approach_line import Approacher
from linien_server.parameters import Parameters
Y_SHIFT = 4000