Skip to content

Commit

Permalink
Merge pull request #321 from linien-org/feature/simplify-server-struc…
Browse files Browse the repository at this point in the history
…ture

Simplify server structure
  • Loading branch information
bleykauf authored Mar 29, 2023
2 parents 35de606 + 22bccbe commit b31e28a
Show file tree
Hide file tree
Showing 13 changed files with 528 additions and 633 deletions.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
# along with Linien. If not, see <http://www.gnu.org/licenses/>.

import atexit
import shutil
import subprocess
import threading
from enum import Enum
from multiprocessing import Pipe, Process
from pathlib import Path
from time import sleep

import rpyc
from linien_common.config import ACQUISITION_PORT
from linien_server.utils import flash_fpga, start_nginx, stop_nginx


class AcquisitionConnectionError(Exception):
pass


class AcquisitionProcessSignals(Enum):
Expand All @@ -44,50 +42,47 @@ class AcquisitionProcessSignals(Enum):
SET_DUAL_CHANNEL = 9


class AcquisitionMaster:
class AcquisitionController:
def __init__(self, use_ssh, host):
self.on_new_data_received = None

def receive_acquired_data(conn):
while True:
is_raw, received_data, data_uuid = conn.recv()
if self.on_new_data_received is not None:
self.on_new_data_received(is_raw, received_data, data_uuid)

self.parent_conn, child_conn = Pipe()
process = Process(
target=self.connect_acquisition_process, args=(child_conn, use_ssh, host)
acqusition_service_process = Process(
target=self.connect_acquisition_service, args=(child_conn, use_ssh, host)
)
process.daemon = True
process.start()
acqusition_service_process.daemon = True
acqusition_service_process.start()

# wait until connection is established
self.parent_conn.recv()

thread = threading.Thread(
target=receive_acquired_data, args=(self.parent_conn,)
receive_data_thread = threading.Thread(
target=self.receive_acquired_data, args=(self.parent_conn,)
)
thread.daemon = True
thread.start()
receive_data_thread.daemon = True
receive_data_thread.start()

atexit.register(self.shutdown)

def run_data_acquisition(self, on_new_data_received):
self.on_new_data_received = on_new_data_received
def receive_acquired_data(self, conn):
while True:
is_raw, received_data, data_uuid = conn.recv()
if self.on_new_data_received is not None:
self.on_new_data_received(is_raw, received_data, data_uuid)

def connect_acquisition_process(self, pipe, use_ssh, host):
def connect_acquisition_service(self, pipe, use_ssh, host):
if use_ssh:
# for debugging, acquisition process may be launched manually on the server
# and rpyc can be used to connect to it
acquisition_rpyc = rpyc.connect(host, ACQUISITION_PORT)
acquisition = acquisition_rpyc.root
acquisition_service = acquisition_rpyc.root
else:
# This is what happens in production mode
from linien_server.acquisition_process import DataAcquisitionService
from linien_server.acquisition.service import AcquisitionService

stop_nginx()
flash_fpga()
acquisition = DataAcquisitionService()
acquisition_service = AcquisitionService()

# tell the main thread that we're ready
pipe.send(True)
Expand All @@ -103,23 +98,23 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
raise SystemExit()
elif data[0] == AcquisitionProcessSignals.SET_SWEEP_SPEED:
speed = data[1]
acquisition.exposed_set_sweep_speed(speed)
acquisition_service.exposed_set_sweep_speed(speed)
elif data[0] == AcquisitionProcessSignals.SET_LOCK_STATUS:
acquisition.exposed_set_lock_status(data[1])
acquisition_service.exposed_set_lock_status(data[1])
elif data[0] == AcquisitionProcessSignals.FETCH_QUADRATURES:
acquisition.exposed_set_fetch_additional_signals(data[1])
acquisition_service.exposed_set_fetch_additional_signals(data[1])
elif data[0] == AcquisitionProcessSignals.SET_RAW_ACQUISITION:
acquisition.exposed_set_raw_acquisition(data[1])
acquisition_service.exposed_set_raw_acquisition(data[1])
elif data[0] == AcquisitionProcessSignals.SET_DUAL_CHANNEL:
acquisition.exposed_set_dual_channel(data[1])
acquisition_service.exposed_set_dual_channel(data[1])
elif data[0] == AcquisitionProcessSignals.SET_CSR:
acquisition.exposed_set_csr(*data[1])
acquisition_service.exposed_set_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.SET_IIR_CSR:
acquisition.exposed_set_iir_csr(*data[1])
acquisition_service.exposed_set_iir_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.PAUSE_ACQUISIITON:
acquisition.exposed_pause_acquisition()
acquisition_service.exposed_pause_acquisition()
elif data[0] == AcquisitionProcessSignals.CONTINUE_ACQUISITION:
acquisition.exposed_continue_acquisition(data[1])
acquisition_service.exposed_continue_acquisition(data[1])

# load acquired data and send it to the main thread
(
Expand All @@ -128,27 +123,32 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
data_was_raw,
new_data,
data_uuid,
) = acquisition.exposed_return_data(last_hash)
) = acquisition_service.exposed_return_data(last_hash)
if new_data_returned:
last_hash = new_hash
pipe.send((data_was_raw, new_data, data_uuid))

sleep(0.05)

def pause_acquisition(self):
self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))

def continue_acquisition(self, uuid):
self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))

def shutdown(self):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SHUTDOWN,))

start_nginx()

def set_sweep_speed(self, speed):
self.parent_conn.send((AcquisitionProcessSignals.SET_SWEEP_SPEED, speed))

def lock_status_changed(self, status):
def set_lock_status(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SET_LOCK_STATUS, status))

def fetch_additional_signals_changed(self, status):
def fetch_additional_signals(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.FETCH_QUADRATURES, status))

Expand All @@ -158,18 +158,24 @@ def set_csr(self, key, value):
def set_iir_csr(self, *args):
self.parent_conn.send((AcquisitionProcessSignals.SET_IIR_CSR, args))

def pause_acquisition(self):
self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))

def continue_acquisition(self, uuid):
self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))

def set_raw_acquisition(self, enabled, decimation=None):
if decimation is None:
decimation = 0
def set_raw_acquisition(self, enabled, decimation=0):
self.parent_conn.send(
(AcquisitionProcessSignals.SET_RAW_ACQUISITION, (enabled, decimation))
)

def set_dual_channel(self, enabled):
self.parent_conn.send((AcquisitionProcessSignals.SET_DUAL_CHANNEL, enabled))


def stop_nginx():
subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait()
subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait()


def start_nginx():
subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"])


def flash_fpga():
filepath = Path(__file__).parents[1] / "linien.bin"
shutil.copy(str(filepath.resolve()), "/dev/xdevcfg")
Loading

0 comments on commit b31e28a

Please sign in to comment.