Skip to content

Commit

Permalink
fix: race condition when initializing multiprocessing manager (#26)
Browse files Browse the repository at this point in the history
Closes #18

### Summary of Changes

- use spawn instead of fork to not deadlock when running tests

---------

Co-authored-by: megalinter-bot <[email protected]>
  • Loading branch information
WinPlay02 and megalinter-bot authored Dec 5, 2023
1 parent 20e9aac commit fc5934f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/safeds_runner/server/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ class PipelineManager:
"""

def __init__(self) -> None:
"""Create a new PipelineManager object, which needs to be started by calling startup()."""
"""Create a new PipelineManager object, which is lazily started, when needed."""
self._placeholder_map: dict = {}
self._websocket_target: simple_websocket.Server | None = None

@cached_property
def _multiprocessing_manager(self) -> SyncManager:
if multiprocessing.get_start_method() != "spawn":
multiprocessing.set_start_method("spawn", force=True)
return multiprocessing.Manager()

@cached_property
Expand All @@ -65,6 +67,7 @@ def _startup(self) -> None:
This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash.
"""
_mq = self._messages_queue # Initialize it here before starting a thread to avoid potential race condition
if not self._messages_queue_thread.is_alive():
self._messages_queue_thread.start()

Expand Down
1 change: 1 addition & 0 deletions tests/safeds_runner/server/test_runner_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


def test_should_runner_start_successfully() -> None:
subprocess._USE_VFORK = False # Do not fork the subprocess as it is unsafe to do
process = subprocess.Popen(["poetry", "run", "safe-ds-runner", "start"], cwd=_project_root, stderr=subprocess.PIPE)
while process.poll() is None:
process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip()
Expand Down
16 changes: 10 additions & 6 deletions tests/safeds_runner/server/test_websocket_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def __init__(self, messages: list[str]):
self.received: list[str] = []
self.close_reason: int | None = None
self.close_message: str | None = None
self.condition_variable = threading.Condition()
self.condition_variable = threading.Condition(lock=threading.Lock())

def send(self, msg: str) -> None:
self.received.append(msg)
with self.condition_variable:
self.received.append(msg)
self.condition_variable.notify_all()

def receive(self) -> str | None:
Expand All @@ -44,7 +44,11 @@ def wait_for_messages(self, wait_for_messages: int = 1) -> None:
with self.condition_variable:
if len(self.received) >= wait_for_messages:
return
self.condition_variable.wait()
self.condition_variable.wait(1.0) # this should not be needed, but it seems the process can get stuck

def get_next_received_message(self) -> str:
with self.condition_variable:
return self.received.pop(0)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -208,7 +212,7 @@ def test_should_execute_pipeline_return_exception(
mock_connection = MockWebsocketConnection(messages)
ws_main(mock_connection, app_pipeline_manager)
mock_connection.wait_for_messages(1)
exception_message = Message.from_dict(json.loads(mock_connection.received.pop(0)))
exception_message = Message.from_dict(json.loads(mock_connection.get_next_received_message()))

assert exception_message.type == expected_response_runtime_error.type
assert exception_message.id == expected_response_runtime_error.id
Expand Down Expand Up @@ -293,7 +297,7 @@ def test_should_execute_pipeline_return_valid_placeholder(
# And compare with expected responses
while len(expected_responses) > 0:
mock_connection.wait_for_messages(1)
next_message = Message.from_dict(json.loads(mock_connection.received.pop(0)))
next_message = Message.from_dict(json.loads(mock_connection.get_next_received_message()))
assert next_message == expected_responses.pop(0)


Expand Down Expand Up @@ -360,5 +364,5 @@ def test_should_successfully_execute_simple_flow(messages: list[str], expected_r
mock_connection = MockWebsocketConnection(messages)
ws_main(mock_connection, app_pipeline_manager)
mock_connection.wait_for_messages(1)
query_result_invalid = Message.from_dict(json.loads(mock_connection.received.pop(0)))
query_result_invalid = Message.from_dict(json.loads(mock_connection.get_next_received_message()))
assert query_result_invalid == expected_response

0 comments on commit fc5934f

Please sign in to comment.