Skip to content

Commit

Permalink
feat: update to safe-ds 0.17.1 + server refactor (#37)
Browse files Browse the repository at this point in the history
### Summary of Changes

- updated to safe-ds 0.17.1 + dependencies
- temporarily disable python 3.12
- changed image encoding to match new safe-ds version
- removed jpeg test
- refactored server stuff from main.py into a new class in server.py

---------

Co-authored-by: megalinter-bot <[email protected]>
  • Loading branch information
WinPlay02 and megalinter-bot authored Jan 14, 2024
1 parent cae05c5 commit 1bcad07
Show file tree
Hide file tree
Showing 9 changed files with 1,394 additions and 1,084 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
matrix:
python-version:
- '3.11'
- '3.12'
# - '3.12'
uses: lars-reimann/.github/.github/workflows/poetry-codecov-reusable.yml@main
with:
working-directory: .
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
matrix:
python-version:
- '3.11'
- '3.12'
# - '3.12'
uses: lars-reimann/.github/.github/workflows/poetry-codecov-reusable.yml@main
with:
working-directory: .
Expand Down
2,004 changes: 1,151 additions & 853 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ packages = [
safe-ds-runner = "safeds_runner.main:main"

[tool.poetry.dependencies]
python = "^3.11,<3.13"
safe-ds = ">=0.14,<0.17"
python = "^3.11,<3.12"
safe-ds = ">=0.17,<0.18"
flask = "^3.0.0"
flask-cors = "^4.0.0"
flask-sock = "^0.7.0"
Expand Down
9 changes: 2 additions & 7 deletions src/safeds_runner/server/json_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any

from safeds.data.image.containers import Image
from safeds.data.image.typing import ImageFormat
from safeds.data.tabular.containers import Table


Expand Down Expand Up @@ -42,10 +41,6 @@ def default(self, o: Any) -> Any:
for key in dict_with_nan_infinity
}
if isinstance(o, Image):
# Send images together with their format
match o.format:
case ImageFormat.JPEG:
return {"format": o.format.value, "bytes": str(base64.encodebytes(o._repr_jpeg_()), "utf-8")}
case ImageFormat.PNG:
return {"format": o.format.value, "bytes": str(base64.encodebytes(o._repr_png_()), "utf-8")}
# Send images together with their format, by default images are encoded only as PNG
return {"format": "png", "bytes": str(base64.encodebytes(o._repr_png_()), "utf-8")}
return json.JSONEncoder.default(self, o)
191 changes: 7 additions & 184 deletions src/safeds_runner/server/main.py
Original file line number Diff line number Diff line change
@@ -1,190 +1,11 @@
"""Module containing the main entry point, for starting the Safe-DS runner."""

import json
import logging
import sys

import flask.app
import flask_sock
import simple_websocket
from flask import Flask
from flask_cors import CORS
from flask_sock import Sock

from safeds_runner.server import messages
from safeds_runner.server.json_encoder import SafeDsEncoder
from safeds_runner.server.messages import (
Message,
create_placeholder_value,
message_type_placeholder_value,
parse_validate_message,
)
from safeds_runner.server.pipeline_manager import PipelineManager


def create_flask_app(testing: bool = False) -> flask.app.App:
"""
Create a flask app, that handles all requests.
Parameters
----------
testing : bool
Whether the app should run in a testing context.
Returns
-------
flask.app.App
Flask app.
"""
flask_app = Flask(__name__)
# Websocket Configuration
flask_app.config["SOCK_SERVER_OPTIONS"] = {"ping_interval": 25}
flask_app.config["TESTING"] = testing

# Allow access from VSCode extension
CORS(flask_app, resources={r"/*": {"origins": "vscode-webview://*"}})
return flask_app


def create_flask_websocket(flask_app: flask.app.App) -> flask_sock.Sock:
"""
Create a flask websocket extension.
Parameters
----------
flask_app: flask.app.App
Flask App Instance.
Returns
-------
flask_sock.Sock
Websocket extension for the provided flask app.
"""
return Sock(flask_app)


app = create_flask_app()
sock = create_flask_websocket(app)
app_pipeline_manager = PipelineManager()


@sock.route("/WSMain")
def _ws_main(ws: simple_websocket.Server) -> None:
ws_main(ws, app_pipeline_manager) # pragma: no cover


def ws_main(ws: simple_websocket.Server, pipeline_manager: PipelineManager) -> None:
"""
Handle websocket requests to the WSMain endpoint.
This function handles the bidirectional communication between the runner and the VS Code extension.
Parameters
----------
ws : simple_websocket.Server
Websocket Connection, provided by flask.
pipeline_manager : PipelineManager
Manager used to execute pipelines on, and retrieve placeholders from
"""
logging.debug("Request to WSRunProgram")
pipeline_manager.connect(ws)
while True:
# This would be a JSON message
received_message: str = ws.receive()
if received_message is None:
logging.debug("Received EOF, closing connection")
pipeline_manager.disconnect(ws)
ws.close()
return
logging.debug("Received Message: %s", received_message)
received_object, error_detail, error_short = parse_validate_message(received_message)
if received_object is None:
logging.error(error_detail)
pipeline_manager.disconnect(ws)
ws.close(message=error_short)
return
match received_object.type:
case "shutdown":
logging.debug("Requested shutdown...")
pipeline_manager.shutdown()
sys.exit(0)
case "program":
program_data, invalid_message = messages.validate_program_message_data(received_object.data)
if program_data is None:
logging.error("Invalid message data specified in: %s (%s)", received_message, invalid_message)
pipeline_manager.disconnect(ws)
ws.close(None, invalid_message)
return
# This should only be called from the extension as it is a security risk
pipeline_manager.execute_pipeline(program_data, received_object.id)
case "placeholder_query":
# For this query, a response can be directly sent to the requesting connection
placeholder_query_data, invalid_message = messages.validate_placeholder_query_message_data(
received_object.data,
)
if placeholder_query_data is None:
logging.error("Invalid message data specified in: %s (%s)", received_message, invalid_message)
pipeline_manager.disconnect(ws)
ws.close(None, invalid_message)
return
placeholder_type, placeholder_value = pipeline_manager.get_placeholder(
received_object.id,
placeholder_query_data,
)
# send back a value message
if placeholder_type is not None:
try:
broadcast_message(
[ws],
Message(
message_type_placeholder_value,
received_object.id,
create_placeholder_value(placeholder_query_data, placeholder_type, placeholder_value),
),
)
except TypeError as _encoding_error:
# if the value can't be encoded send back that the value exists but is not displayable
broadcast_message(
[ws],
Message(
message_type_placeholder_value,
received_object.id,
create_placeholder_value(placeholder_query_data, placeholder_type, "<Not displayable>"),
),
)
else:
# Send back empty type / value, to communicate that no placeholder exists (yet)
# Use name from query to allow linking a response to a request on the peer
broadcast_message(
[ws],
Message(
message_type_placeholder_value,
received_object.id,
create_placeholder_value(placeholder_query_data, "", ""),
),
)
case _:
if received_object.type not in messages.message_types:
logging.warning("Invalid message type: %s", received_object.type)


def broadcast_message(connections: list[simple_websocket.Server], message: Message) -> None:
"""
Send any message to all the provided connections (to the VS Code extension).
Parameters
----------
connections : list[simple_websocket.Server]
List of Websocket connections that should receive the message.
message : Message
Object that will be sent.
"""
message_encoded = json.dumps(message.to_dict(), cls=SafeDsEncoder)
for connection in connections:
connection.send(message_encoded)


def start_server(port: int) -> None: # pragma: no cover
def start_server(port: int) -> None:
"""Start the Safe-DS Runner server."""
# Allow prints to be unbuffered by default
import builtins
Expand All @@ -194,12 +15,14 @@ def start_server(port: int) -> None: # pragma: no cover

logging.getLogger().setLevel(logging.DEBUG)
# Startup early, so our multiprocessing setup works
app_pipeline_manager = PipelineManager()
app_pipeline_manager.startup()
from gevent.monkey import patch_all
from gevent.pywsgi import WSGIServer

# Patch WebSockets to work in parallel
patch_all()
logging.info("Starting Safe-DS Runner on port %s", str(port))
# Only bind to host=127.0.0.1. Connections from other devices should not be accepted
WSGIServer(("127.0.0.1", port), app, spawn=8).serve_forever()

from safeds_runner.server.server import SafeDsServer

safeds_server = SafeDsServer(app_pipeline_manager) # pragma: no cover
safeds_server.listen(port) # pragma: no cover
Loading

0 comments on commit 1bcad07

Please sign in to comment.