Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use websockets for json communication #4490

Merged
merged 41 commits into from
Sep 10, 2023

Conversation

AndrewJGaut
Copy link
Contributor

@AndrewJGaut AndrewJGaut commented Jun 14, 2023

This PR addresses #4431. It modifies JSON messaging between the REST server / bundle manager and workers.

An in-depth document describing these changes and their motivation is given here.

Summary

Previously, messages were sent to workers in the HTTP response to worker checkins. However, this was inefficient; a server thread would try to send the message through an AF Unix socket corresponding to the worker using TCP, and so if the sending and receiving threads were not synchronized, the message would not be sent. The need for synchronization made server to worker communication highly inefficient. To make this slightly more efficient, a websocket server was added and would send a ping to the worker meant to receive the message to cause it to checkin immediately so that the receiving server thread would start listening as soon as possible on the AF Unix socket to receive the message. However, desynchronization and corresponding inefficiency persisted.

After this PR, JSON messages will be sent to workers immediately through the websocket server. This means that:
(1) Workers will no longer receive any messages through HTTP responses to the checkin.
(2) No synchronization is required between server threads for JSON messaging.
(3) The websocket server no longer uses the ping functionality.

We also add in authentication of workers and server to the websocket server. (The need for this is described here.)

Speedup

When looking at the time tests, the only test that uses worker communication is the run test. For that test, before the websockets change, it would take over 11 seconds (11.86 seconds here, 11.44 seconds here). After the websockets change, we get 5.57 seconds. That's a speedup of over 2x! (To see the time in those links, click on Run tests using Docker runtime and then search cl info -f name and it will be the number right above the line that comes up.)

Future Work / TODO

I might do these in a future PR:

  • Only release lock once worker is done with whatever it's doing
  • More fine-grained locking for the worker (lock per bundle uuid)
  • Dynamic resizing of number of websockets for worker depending on 'demand'

codalab/bin/ws_server.py Outdated Show resolved Hide resolved
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
@epicfaace
Copy link
Member

"working quite well now" 👀 👀

@AndrewJGaut AndrewJGaut marked this pull request as ready for review July 16, 2023 16:37
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
@AndrewJGaut AndrewJGaut marked this pull request as ready for review July 17, 2023 01:49
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
return

logger.warning(f"All websockets for worker {worker_id} are currently busy.")
await server_websocket.close(1013, f"All websockets for worker {worker_id} are currently busy.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 1013?

Copy link
Contributor Author

@AndrewJGaut AndrewJGaut Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... weird. I see this listed in several other places (e.g. here) as Try again later, but it's not in the RFC.

Perhaps I should change this to a different status code, but none of the ones listed in the RFC are fitting... what do you think?

codalab/bin/ws_server.py Outdated Show resolved Hide resolved
codalab/lib/codalab_manager.py Show resolved Hide resolved
logging.info('Unable to reach worker')

def _get_read_response_stream(self, response_socket_id):
with closing(self._worker_model.start_listening(response_socket_id)) as sock:
header_message = self._worker_model.get_json_message(sock, 60)
header_message = self._worker_model.recv_json_message_with_sock(sock, 60)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to get_json_message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to keep the with_sock portion (I changed it to with_unix_socket in this newest version) since it helps make clear that it's using AF_Unix sockets. However, I could change recv back to get -- I thought recv was clearer since that's typically the API for getting data from sockets.

Let me know. I can definitely change this.

codalab/lib/download_manager.py Outdated Show resolved Hide resolved
codalab/model/bundle_model.py Show resolved Hide resolved
codalab/worker/main.py Outdated Show resolved Hide resolved
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
codalab/bin/ws_server.py Outdated Show resolved Hide resolved
codalab/model/bundle_model.py Show resolved Hide resolved
Copy link
Member

@epicfaace epicfaace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. This new architecture is all documented in your doc, correct? Can you link to it? (maybe we also add a link to it as a comment in the code in ws_server.py so we never lose it)

@@ -836,7 +852,7 @@ def netcat_fn():
break
total_data.append(data)
s.close()
reply(None, {}, b''.join(total_data))
reply(None, {}, io.BytesIO(b''.join(total_data)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this change?

@@ -136,6 +140,11 @@ def __init__(

self.ws_server = ws_server

assert (
num_coroutines > 0 and type(num_coroutines) is int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

@epicfaace
Copy link
Member

@AndrewJGaut is this ready to merge?

@AndrewJGaut
Copy link
Contributor Author

Looks good to me. This new architecture is all documented in your doc, correct? Can you link to it? (maybe we also add a link to it as a comment in the code in ws_server.py so we never lose it)

I figured it was enough to link the doc in the PR description, since it's stated pretty clearly. Do you think we should in the code as well? I can do that.

@AndrewJGaut
Copy link
Contributor Author

@AndrewJGaut is this ready to merge?

Let me test it out in dev one more time to be sure.

@AndrewJGaut
Copy link
Contributor Author

@AndrewJGaut is this ready to merge?

Let me test it out in dev one more time to be sure.

Confirmed it's good to merge.

@AndrewJGaut AndrewJGaut merged commit 7c460c1 into master Sep 10, 2023
63 checks passed
@AndrewJGaut AndrewJGaut deleted the use-websockets-for-json-communication branch September 10, 2023 19:24
@epicfaace epicfaace mentioned this pull request Sep 26, 2023
AndrewJGaut added a commit that referenced this pull request Oct 7, 2023
epicfaace pushed a commit that referenced this pull request Oct 10, 2023
@AndrewJGaut AndrewJGaut restored the use-websockets-for-json-communication branch October 11, 2023 03:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants