-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Fix task submission never return when network partition happens #44692
Conversation
06f7e99
to
3964008
Compare
1a1043b
to
5c3d99d
Compare
4f3fa53
to
be06d61
Compare
def check_task_not_running(): | ||
output = head.exec_run(cmd="ray list tasks --format json") | ||
if output.exit_code == 0: | ||
tasks_json = json.loads(output.output) | ||
print("tasks_json:", json.dumps(tasks_json, indent=2)) | ||
return all([task["state"] != "RUNNING" for task in tasks_json]) | ||
return False | ||
|
||
def check_task_state(n=0, state="RUNNING"): | ||
output = head.exec_run(cmd="ray list tasks --format json") | ||
if output.exit_code == 0: | ||
tasks_json = json.loads(output.output) | ||
print("tasks_json:", json.dumps(tasks_json, indent=2)) | ||
return n == sum([task["state"] == state for task in tasks_json]) | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These could be deduplicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They have some subtle differences.
For example, in the first check_task_running(), we not only check if the tasks are running, but also check the length of task states are equal to 2. This ensure the network is stable and no task failure.
Refactoring them into common functions would make the code more complicated than simply:
wait_for_condition(check_task_not_running)
Let's just keep it simple this way :)
e747893
to
d31a85b
Compare
|
||
SLEEP_TASK_SCRIPTS = """ | ||
import ray | ||
ray.init(address="localhost:6379") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just ray.init()
is fine, it will connect to the existing cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
"RAY_grpc_client_keepalive_timeout_ms=1000", | ||
], | ||
) | ||
sleep(3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need this sleep since you have wait_for_condition
later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously grpc client will only send 2 ping frames when there is no data/header frame to be sent.
keepalive interval is 1s. So after 3s it wouldn't send anything and failed the test before. time.sleep(3)
sleep(2) | ||
# kill the worker to simulate the spot shutdown | ||
worker.kill() | ||
print("Killed worker") | ||
sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why these sleeps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In these steps, we will first observe that the node died. In previous error case, the tasks will stay in RUNNING state and become hanging forever. After the fix, we should observe that the tasks are changed to PENDING_NODE_ASSIGNMENT state.
I also added the comments to it
e805df9
to
fd88d43
Compare
# https://docker-py.readthedocs.io/en/stable/networks.html#docker.models.networks.Network.disconnect | ||
network.disconnect(worker.name) | ||
print("Disconnected network") | ||
sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why we need to sleep 2 seconds here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because keepalive interval is set to 1s and timeout is set to 1s. After 2s the worker would be known dead from head node side. From there on we can continue to run the checkers in the following code. Note that the worker.kill()
is a bit unnecessary in this case though. Although we want to simulate the shutdown too.
@jjyao Ready to review. PTAL! |
cbbed25
to
80a0a9d
Compare
Signed-off-by: hongchaodeng <[email protected]>
80a0a9d
to
4403051
Compare
This fixes the problem that PushTask() grpc call is hanging when network partition happens. This grpc call hang because by default grpc sends two ping frames and then it won't send anything if no data frame sent. Meanwhile, the worker node was taking a force shutdown and won't send FIN back to the caller so grpc won't discover the disconnection without ping frames.