Skip to content

Commit

Permalink
wip psp3
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <[email protected]>
  • Loading branch information
timkpaine committed Nov 16, 2024
1 parent 6bf0853 commit 1225ddc
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 30 deletions.
9 changes: 4 additions & 5 deletions js/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ function removeTrailingSlash(url) {
return url.replace(/\/$/, "");
}

window.addEventListener("load", async () => {
document.addEventListener("DOMContentLoaded", async () => {
const workspace = document.querySelector("perspective-workspace");
const saveButton = document.getElementById("save-layout-button");
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const websocket = perspective.websocket(
const websocket = await perspective.websocket(
`${protocol}//${window.location.host}${removeTrailingSlash(
window.location.pathname,
)}/ws`,
);
const registeredTables = new Set();
const worker = perspective.worker();

const updateTables = async () => {
console.log("HERE");
const updateTables = async () => {
const response = await fetch(
`${removeTrailingSlash(window.location.href)}/tables`,
);
Expand Down
18 changes: 11 additions & 7 deletions raydar/dashboard/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
import ray
import starlette

from ray.serve.handle import DeploymentResponse
from .server import PerspectiveProxyRayServer, PerspectiveRayServer


@ray.remote
def test_job(backoff, tablename, proxy):
def test_job(backoff):
start = time.time()
time.sleep(backoff)
end = time.time()
runtime = end - start
data = dict(start=start, end=end, runtime=runtime, backoff=backoff, random=random.random())
return proxy.remote("update", tablename, data)
return dict(start=start, end=end, runtime=runtime, backoff=backoff, random=random.random())


if __name__ == "__main__":
Expand All @@ -24,10 +24,10 @@ def test_job(backoff, tablename, proxy):
host = "127.0.0.1" # NOTE: change if you run on another machine
port = 8989
ray.init(dashboard_host=host, dashboard_port=port, runtime_env={"py_modules": [starlette]})
ray.serve.start(http_options={"host": host, "port": port})
ray.serve.start(http_options={"host": host, "port": port+1})

webserver = ray.serve.run(PerspectiveRayServer.bind(), name="webserver", route_prefix="/")
proxy_server = ray.serve.run(PerspectiveProxyRayServer.bind(webserver), name="proxy", route_prefix="/proxy")
proxy_server: DeploymentResponse = ray.serve.run(PerspectiveProxyRayServer.bind(webserver), name="proxy", route_prefix="/proxy")

# setup perspective table
proxy_server.remote(
Expand All @@ -42,7 +42,11 @@ def test_job(backoff, tablename, proxy):
},
)

# launch jobs
# # launch jobs
while True:
test_job.remote(backoff=random.random(), tablename="data", proxy=proxy_server)
proxy_server.remote(
"update",
"data",
test_job.remote(backoff=random.random())
)
time.sleep(0.5)
64 changes: 46 additions & 18 deletions raydar/dashboard/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from asyncio import get_event_loop
from os import environ
from os.path import abspath, dirname, join
from threading import Thread
Expand All @@ -7,21 +8,31 @@
from fastapi import FastAPI, HTTPException, Request, Response, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from perspective import PerspectiveManager, PerspectiveStarletteHandler, Table
from packaging.version import parse
from perspective import __version__ as perspective_version
from pydantic import BaseModel, Field
from ray.serve import Application, deployment, ingress

if parse(perspective_version) >= parse("3"):
from perspective import Server as PerspectiveServer, Table
from perspective.handlers.starlette import PerspectiveStarletteHandler

PERSPECTIVE_3 = True
else:
from perspective import PerspectiveManager as PerspectiveServer, Table
from perspective.handlers import PerspectiveStarletteHandler

PERSPECTIVE_3 = False

class PerspectiveRayServerArgs(BaseModel):
name: str = Field(default="Perspective")
manager: str = Field(default_factory=PerspectiveManager)


def perspective_thread(manager: PerspectiveManager):
def perspective_thread(client):
from asyncio import new_event_loop

psp_loop = new_event_loop()
manager.set_loop_callback(psp_loop.call_soon_threadsafe)
client.set_loop_callback(psp_loop.call_soon_threadsafe)
psp_loop.run_forever()


Expand All @@ -37,35 +48,57 @@ class PerspectiveRayServer:
def __init__(self, args: PerspectiveRayServerArgs = None):
logger.setLevel(logging.ERROR)
args = args or PerspectiveRayServerArgs()
self._manager = args.manager
self._schemas = {}
self._tables = {}
self._psp_thread = Thread(target=perspective_thread, args=(self._manager,), daemon=True)
self._psp_thread.start()
self._psp_server = None

def _init(self):
if self._psp_server is None:
self._psp_server = PerspectiveServer()
if PERSPECTIVE_3:
self._psp_client = self._psp_server.new_local_client()
else:
self._psp_client = self._psp_server
self._psp_thread = Thread(target=perspective_thread, args=(self._psp_client,), daemon=True)
self._psp_thread.start()

def new_table(self, tablename: str, schema) -> None:
self._init()
if tablename in self._schemas:
return self._schemas[tablename]
self._schemas[tablename] = schema
self._tables[tablename] = Table(schema)
self._manager.host_table(tablename, self._tables[tablename])
if PERSPECTIVE_3:
self._psp_client.table(schema, name=tablename)
else:
self._tables[tablename] = Table(schema)
self._psp_client.host_table(tablename, self._tables[tablename])

def clear_table(self, tablename: str, schema) -> None:
self._init()
if tablename in self._tables:
self._tables[tablename].clear()

@app.get("/")
async def site(self):
return FileResponse(join(static_files_dir, "index.html"))
def update(self, tablename: str, data):
self._init()
if isinstance(data, dict):
data = [data]
self._tables[tablename].update(data)

@app.websocket("/ws")
async def ws(self, ws: WebSocket):
handler = PerspectiveStarletteHandler(manager=self._manager, websocket=ws)
if PERSPECTIVE_3:
handler = PerspectiveStarletteHandler(perspective_server=self._psp_server, websocket=ws)
else:
handler = PerspectiveStarletteHandler(manager=self._psp_server, websocket=ws)
try:
await handler.run()
except WebSocketDisconnect:
...

@app.get("/")
async def site(self):
return FileResponse(join(static_files_dir, "index.html"))

@app.get("/tables")
async def tables(self):
return list(self._schemas.keys())
Expand Down Expand Up @@ -110,11 +143,6 @@ async def update_table_rest(self, tablename: str, request: Request) -> Response:
except BaseException as exception:
raise HTTPException(503, f"Exception during data ingestion: {tablename} / {format_exc()}") from exception

def update(self, tablename: str, data):
if isinstance(data, dict):
data = [data]
self._tables[tablename].update(data)


@deployment(name="Perspective_Proxy_Server")
class PerspectiveProxyRayServer:
Expand Down
17 changes: 17 additions & 0 deletions raydar/tests/test_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import time

import pytest
import ray
import requests

from raydar import RayTaskTracker, setup_proxy_server


class TestRayServer:
def test_get_proxy_server(self, proxy_server):
proxy_server.remote("new", "test_table", dict(a="string", b="integer", c="float", d="datetime"))
time.sleep(2)
proxy_server.remote("update", "test_table", [dict(a="foo", b=1, c=1.0, d=time.time())])
time.sleep(2)
response = requests.get("http://localhost:8000/tables")
assert eval(response.text) == ["test_table"]

0 comments on commit 1225ddc

Please sign in to comment.