Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify server structure #321

Merged
merged 15 commits into from
Mar 29, 2023
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
# along with Linien. If not, see <http://www.gnu.org/licenses/>.

import atexit
import shutil
import subprocess
import threading
from enum import Enum
from multiprocessing import Pipe, Process
from pathlib import Path
from time import sleep

import rpyc
from linien_common.config import ACQUISITION_PORT
from linien_server.utils import flash_fpga, start_nginx, stop_nginx


class AcquisitionConnectionError(Exception):
pass


class AcquisitionProcessSignals(Enum):
Expand All @@ -44,50 +42,47 @@ class AcquisitionProcessSignals(Enum):
SET_DUAL_CHANNEL = 9


class AcquisitionMaster:
class AcquisitionController:
def __init__(self, use_ssh, host):
self.on_new_data_received = None

def receive_acquired_data(conn):
while True:
is_raw, received_data, data_uuid = conn.recv()
if self.on_new_data_received is not None:
self.on_new_data_received(is_raw, received_data, data_uuid)

self.parent_conn, child_conn = Pipe()
process = Process(
target=self.connect_acquisition_process, args=(child_conn, use_ssh, host)
acqusition_service_process = Process(
target=self.connect_acquisition_service, args=(child_conn, use_ssh, host)
)
process.daemon = True
process.start()
acqusition_service_process.daemon = True
acqusition_service_process.start()

# wait until connection is established
self.parent_conn.recv()

thread = threading.Thread(
target=receive_acquired_data, args=(self.parent_conn,)
receive_data_thread = threading.Thread(
target=self.receive_acquired_data, args=(self.parent_conn,)
)
thread.daemon = True
thread.start()
receive_data_thread.daemon = True
receive_data_thread.start()

atexit.register(self.shutdown)

def run_data_acquisition(self, on_new_data_received):
self.on_new_data_received = on_new_data_received
def receive_acquired_data(self, conn):
while True:
is_raw, received_data, data_uuid = conn.recv()
if self.on_new_data_received is not None:
self.on_new_data_received(is_raw, received_data, data_uuid)

def connect_acquisition_process(self, pipe, use_ssh, host):
def connect_acquisition_service(self, pipe, use_ssh, host):
if use_ssh:
# for debugging, acquisition process may be launched manually on the server
# and rpyc can be used to connect to it
acquisition_rpyc = rpyc.connect(host, ACQUISITION_PORT)
acquisition = acquisition_rpyc.root
acquisition_service = acquisition_rpyc.root
else:
# This is what happens in production mode
from linien_server.acquisition_process import DataAcquisitionService
from linien_server.acquisition.service import AcquisitionService

stop_nginx()
flash_fpga()
acquisition = DataAcquisitionService()
acquisition_service = AcquisitionService()

# tell the main thread that we're ready
pipe.send(True)
Expand All @@ -103,23 +98,23 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
raise SystemExit()
elif data[0] == AcquisitionProcessSignals.SET_SWEEP_SPEED:
speed = data[1]
acquisition.exposed_set_sweep_speed(speed)
acquisition_service.exposed_set_sweep_speed(speed)
elif data[0] == AcquisitionProcessSignals.SET_LOCK_STATUS:
acquisition.exposed_set_lock_status(data[1])
acquisition_service.exposed_set_lock_status(data[1])
elif data[0] == AcquisitionProcessSignals.FETCH_QUADRATURES:
acquisition.exposed_set_fetch_additional_signals(data[1])
acquisition_service.exposed_set_fetch_additional_signals(data[1])
elif data[0] == AcquisitionProcessSignals.SET_RAW_ACQUISITION:
acquisition.exposed_set_raw_acquisition(data[1])
acquisition_service.exposed_set_raw_acquisition(data[1])
elif data[0] == AcquisitionProcessSignals.SET_DUAL_CHANNEL:
acquisition.exposed_set_dual_channel(data[1])
acquisition_service.exposed_set_dual_channel(data[1])
elif data[0] == AcquisitionProcessSignals.SET_CSR:
acquisition.exposed_set_csr(*data[1])
acquisition_service.exposed_set_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.SET_IIR_CSR:
acquisition.exposed_set_iir_csr(*data[1])
acquisition_service.exposed_set_iir_csr(*data[1])
elif data[0] == AcquisitionProcessSignals.PAUSE_ACQUISIITON:
acquisition.exposed_pause_acquisition()
acquisition_service.exposed_pause_acquisition()
elif data[0] == AcquisitionProcessSignals.CONTINUE_ACQUISITION:
acquisition.exposed_continue_acquisition(data[1])
acquisition_service.exposed_continue_acquisition(data[1])

# load acquired data and send it to the main thread
(
Expand All @@ -128,27 +123,32 @@ def connect_acquisition_process(self, pipe, use_ssh, host):
data_was_raw,
new_data,
data_uuid,
) = acquisition.exposed_return_data(last_hash)
) = acquisition_service.exposed_return_data(last_hash)
if new_data_returned:
last_hash = new_hash
pipe.send((data_was_raw, new_data, data_uuid))

sleep(0.05)

def pause_acquisition(self):
self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))

def continue_acquisition(self, uuid):
self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))

def shutdown(self):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SHUTDOWN,))

start_nginx()

def set_sweep_speed(self, speed):
self.parent_conn.send((AcquisitionProcessSignals.SET_SWEEP_SPEED, speed))

def lock_status_changed(self, status):
def set_lock_status(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.SET_LOCK_STATUS, status))

def fetch_additional_signals_changed(self, status):
def fetch_additional_signals(self, status):
if self.parent_conn:
self.parent_conn.send((AcquisitionProcessSignals.FETCH_QUADRATURES, status))

Expand All @@ -158,18 +158,24 @@ def set_csr(self, key, value):
def set_iir_csr(self, *args):
self.parent_conn.send((AcquisitionProcessSignals.SET_IIR_CSR, args))

def pause_acquisition(self):
self.parent_conn.send((AcquisitionProcessSignals.PAUSE_ACQUISIITON, True))

def continue_acquisition(self, uuid):
self.parent_conn.send((AcquisitionProcessSignals.CONTINUE_ACQUISITION, uuid))

def set_raw_acquisition(self, enabled, decimation=None):
if decimation is None:
decimation = 0
def set_raw_acquisition(self, enabled, decimation=0):
self.parent_conn.send(
(AcquisitionProcessSignals.SET_RAW_ACQUISITION, (enabled, decimation))
)

def set_dual_channel(self, enabled):
self.parent_conn.send((AcquisitionProcessSignals.SET_DUAL_CHANNEL, enabled))


def stop_nginx():
subprocess.Popen(["systemctl", "stop", "redpitaya_nginx.service"]).wait()
subprocess.Popen(["systemctl", "stop", "redpitaya_scpi.service"]).wait()


def start_nginx():
subprocess.Popen(["systemctl", "start", "redpitaya_nginx.service"])


def flash_fpga():
filepath = Path(__file__).parents[1] / "linien.bin"
shutil.copy(str(filepath.resolve()), "/dev/xdevcfg")
Loading