-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use websockets to make on-demand worker file previews faster #4096
Conversation
codalab/worker/main.py
Outdated
@@ -138,7 +138,7 @@ def parse_args(): | |||
'--checkin-frequency-seconds', | |||
help='Number of seconds to wait between worker check-ins', | |||
type=int, | |||
default=5, | |||
default=20, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it would be nice to update the worker status at least once every 5 seconds...we don't need this to be 20, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No -- we can keep it at 5
logging.warn( | ||
f"Got websocket message, got data: {data}, going to check in now." | ||
) | ||
self.checkin() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For concurrency:
- hold a global lock when running checkin()
- also hold the lock in the worker loop 1) when running checkin() 2) when processing bundles
print("RSH") | ||
worker_id = await websocket.recv() | ||
logger.warn(f"Got a message from the rest server, to ping worker: {worker_id}.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document
@@ -154,6 +154,40 @@ containers, and periodically report on the status of the runs. | |||
A worker also has to respond to various commands such as reading files in the | |||
bundle while it's running, killing bundles, etc. | |||
|
|||
All data transfer between the worker and the server happens through a process known |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJGaut here's an explanation of my approach here, would love any feedback!
…e checkin but process_runs also accesses (and changes) the global variable self.runs. Updated to lock that function as well
@epicfaace Can you please review the changes made since your last change? They are quite minor. Once you give the LGTM, I'll approve it and merge it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
self._checkin_lock = Lock() | ||
# Lock ensures listening thread and main thread don't simultaneously | ||
# access the runs dictionary, thereby causing race conditions. | ||
self._lock = RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use RLock instead of Lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just for elegance's sake.
RLock allows the lock to be acquired multiple times (provided it is also released precisely as many times at is acquired). This is desirable in this case because, to avoid race conditions, we must call self.process_runs before the lock is released within the checkin() function. By using RLock, we allow the code to acquire the lock a second time so that we can run process_runs within checkin.
If we didn't do this, we could use a flag to tell process_runs we already acquired the lock and not acquire the lock in that case, but that seemed less elegant to me.
tests/cli/test_cli.py
Outdated
@@ -1724,7 +1724,7 @@ def test_run(ctx): | |||
# Test that bundle fails when run without sufficient time quota | |||
_run_command([cl, 'uedit', 'codalab', '--time-quota', '2']) | |||
uuid = _run_command([cl, 'run', 'sleep 100000']) | |||
wait_until_state(uuid, State.KILLED, timeout_seconds=60) | |||
wait_until_state(uuid, State.KILLED, timeout_seconds=63) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why from 60 to 63?
codalab/model/worker_model.py
Outdated
def send_json_message(self, socket_id, message, timeout_secs, autoretry=True): | ||
def _ping_worker_ws(self, worker_id): | ||
async def ping_ws(): | ||
async with websockets.connect("ws://ws-server:2901/main") as websocket: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure this isn't hardcoded.
codalab/worker/worker.py
Outdated
while not self.terminate: | ||
logging.warn(f"Connecting anew to: ws://ws-server:2901/worker/{self.id}") | ||
async with websockets.connect( | ||
f"ws://ws-server:2901/worker/{self.id}", max_queue=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure this isn't hardcoded.
@AndrewJGaut my main feedback is that ws-server:2901 shouldn't be hardcoded. we should ensure we make use of the |
Use websockets to make on-demand worker file previews faster. File previews (such as loading stdout) of a worker that is currently running a bundle now take ~0.5 seconds.
Fixes #4084. This PR is basically a POC of my comment #4084 (comment) -- I've added a thin websocket layer so that when a user requests to view one of the worker's files:
This also gives us the latitude to change the worker's default checkin frequency from 5 seconds -> 20 seconds, further decreasing the load on the rest server.
TODOs:
ws-server
URL