Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Task missing in a distributed cluster with multi-node synchronization using SignalActor #18465

Closed
1 of 2 tasks
whhu opened this issue Sep 9, 2021 · 8 comments
Closed
1 of 2 tasks
Labels
bug Something that is supposed to be working; but isn't stale The issue is stale. It will be closed within 7 days unless there are further conversation triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@whhu
Copy link

whhu commented Sep 9, 2021

What is the problem?

Tasks are delivered in an unbalanced manner, with one task missing in the meanwhile.
7 tasks are launched on 2 nodes, which are initialized with 3 CPUs on each.
Ray simultaneously delivers 5 tasks to the head node and 1 task to the worker node. One task is missing.

The expected behavior should have been to deliver 3 tasks to each node at first, and 1 task to either of the nodes after a relaxation.
Perhaps the same problem with #14863

2021-09-09 02:24:14,897 INFO worker.py:825 -- Connecting to existing Ray cluster at address: 192.168.250.135:6379
ready...
set..
(pid=974) 2021-09-09 02:24:25.142375: 9.4
(pid=1020) 2021-09-09 02:24:25.142606: 8.7
(pid=1021) 2021-09-09 02:24:25.142853: 8.7
(pid=1069) 2021-09-09 02:24:25.142807: 7.4
(pid=1093) 2021-09-09 02:24:25.142034: 6.7
(pid=530, ip=192.168.250.143) 2021-09-09 02:24:25.142793: 8.0

Reproduction (REQUIRED)

The runtime is initialized from the docker image rayproject/ray:1.6.0-py38-cpu with Python 3.8.5 and ray 1.6.0 installed.
A cluster of two nodes is initialized with the following command separately.

ray start --head --num-cpus=3
ray start --address=192.168.250.135:6379 --num-cpus=3

The head node launches the script, which is modified from the v1.6.0 document.
time.sleep is inserted for relaxation. print_msg is implemented to print message with a timestamp.

import ray
import asyncio

import time
from datetime import datetime as dt

def print_msg(msg):
    print('{}: {}'.format(dt.now(), msg))

@ray.remote(num_cpus=0)
class SignalActor:
    def __init__(self):
        self.ready_event = asyncio.Event()

    def send(self, clear=False):
        self.ready_event.set()
        if clear:
            self.ready_event.clear()

    async def wait(self, should_wait=True):
        if should_wait:
            await self.ready_event.wait()

@ray.remote
def wait_and_go(signal):
    start = time.time()
    ray.get(signal.wait.remote())
    end = time.time()
    print_msg('{:.1f}'.format(end - start))
    time.sleep(5)

ray.init(address='auto')
signal = SignalActor.remote()
tasks = [wait_and_go.remote(signal) for _ in range(7)]
print("ready...")

time.sleep(10)

print("set..")
ray.get(signal.send.remote())

ray.get(tasks)

If the code snippet cannot be run by itself, the issue will be closed with "needs-repro-script".

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.
@whhu whhu added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 9, 2021
@ericl
Copy link
Contributor

ericl commented Sep 10, 2021

Is the log message just missing since you exit the job immediately after? Try adding a sleep after the get.

Btw, Ray bin packs as the load balancing policy, so what you describe is as expected.

@whhu
Copy link
Author

whhu commented Sep 10, 2021

There is no more outputs with additional time.sleep.
It is incomprehensible that tasks more than the num-cpus are launched simultaneously.
There is always more tasks on the node which starts the python script, regardless of being head or worker.
Could you please drop a hint on the load balancing policy?

2021-09-09 21:13:52,314 INFO worker.py:825 -- Connecting to existing Ray cluster at address: 192.168.250.135:6379
ready...
set..
(pid=100, ip=192.168.250.143) 2021-09-09 21:14:02.600329: 8.0
(pid=159) 2021-09-09 21:14:02.598194: 9.4
(pid=183) 2021-09-09 21:14:02.597647: 8.0
(pid=182) 2021-09-09 21:14:02.599355: 8.7
(pid=184) 2021-09-09 21:14:02.598192: 8.7
(pid=251) 2021-09-09 21:14:02.599427: 7.4

@ericl
Copy link
Contributor

ericl commented Sep 10, 2021

I tried it here using cluster test utils.

  • I see all 7 tasks launched just fine.
  • I did see more tasks than expected launched on the head node. This was because calling ray.wait frees the CPU slot of the task, so the other tasks grabbed the CPU slots. The load balancing policy always prefers to place tasks on the same node if possible to increase locality. However adding a small sleep(1) forces the tasks to be spread onto worker nodes.
import ray
import asyncio

import time
from datetime import datetime as dt

def print_msg(msg):
    print('{}: {}'.format(dt.now(), msg))

@ray.remote(num_cpus=0)
class SignalActor:
    def __init__(self):
        self.ready_event = asyncio.Event()

    def send(self, clear=False):
        self.ready_event.set()
        if clear:
            self.ready_event.clear()

    async def wait(self, should_wait=True):
        if should_wait:
            await self.ready_event.wait()

@ray.remote
def wait_and_go(i, signal):
    print("wait_and_go", i, ray.runtime_context.get_runtime_context().node_id)
    start = time.time()
    time.sleep(1)  # <--- add a sleep
    ray.get(signal.wait.remote())
    end = time.time()
    print_msg('{:.1f}'.format(end - start))
    time.sleep(5)
    print("done", i)

from ray import cluster_utils
cluster = cluster_utils.Cluster()
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)

ray.init(address=cluster.address)
signal = SignalActor.remote()
tasks = [wait_and_go.remote(i, signal) for i in range(7)]
print("ready...")

time.sleep(2)

print("set..")
ray.get(signal.send.remote())

print("wait")
ray.get(tasks)
print("END")

@whhu
Copy link
Author

whhu commented Sep 14, 2021

Thanks very much for the helpful 'cluster_utils'. The additional time.sleep works fine, with 7 tasks delivered to 3 nodes of limited num_cpus=3.

There seems to be an over scheduling, and I wonder whether this is the expected behavior.
When the last add_node is commented, the cluster has 2 nodes with 3 cpus on each. In my opinion, the capacity of the cluster is 6. But 7 tasks are launched in 2 seconds, with 4 of them delivered to the same node. All tasks finish at almost the same time, with a difference less than 1 seconds. Does it indicate that all tasks get the signal at the same time?

In my opinion, the last task should have been started after the first 6 tasks finish. The cluster should have support 6 tasks concurrently, not 7.

I improve the snippet by replacing print with print_msg to mark the timestamp. A sorted simplified console output is attached below, where tasks 0-6 finish at 15:50:48.

2021-09-14 15:50:41.074432: ready...
2021-09-14 15:50:41.379844: wait_and_go 0 NodeID(21448a73eb30c4adb778f65f9a9863a40a56cf0f26c06f8129c092ff)
2021-09-14 15:50:41.679673: wait_and_go 1 NodeID(21448a73eb30c4adb778f65f9a9863a40a56cf0f26c06f8129c092ff)
2021-09-14 15:50:41.988779: wait_and_go 2 NodeID(bf28d069074c4b74ee298533bca5cae7f256b910cab5cf4a0855439e)
2021-09-14 15:50:42.289914: wait_and_go 4 NodeID(21448a73eb30c4adb778f65f9a9863a40a56cf0f26c06f8129c092ff)
2021-09-14 15:50:42.290024: wait_and_go 3 NodeID(bf28d069074c4b74ee298533bca5cae7f256b910cab5cf4a0855439e)
2021-09-14 15:50:42.290515: wait_and_go 5 NodeID(bf28d069074c4b74ee298533bca5cae7f256b910cab5cf4a0855439e)
2021-09-14 15:50:42.706980: wait_and_go 6 NodeID(21448a73eb30c4adb778f65f9a9863a40a56cf0f26c06f8129c092ff)
2021-09-14 15:50:43.074825: set..
2021-09-14 15:50:43.079578: wait
2021-09-14 15:50:43.079589: 1.1
2021-09-14 15:50:43.079858: 1.7
2021-09-14 15:50:43.079858: 1.4
2021-09-14 15:50:43.330041: 1.0
2021-09-14 15:50:43.330312: 1.0
2021-09-14 15:50:43.330696: 1.0
2021-09-14 15:50:43.714716: 1.0
2021-09-14 15:50:48.083772: done 2
2021-09-14 15:50:48.084798: done 0
2021-09-14 15:50:48.084951: done 1
2021-09-14 15:50:48.332319: done 4
2021-09-14 15:50:48.334935: done 5
2021-09-14 15:50:48.335287: done 3
2021-09-14 15:50:48.718917: done 6
2021-09-14 15:50:48.720804: END

@ericl
Copy link
Contributor

ericl commented Sep 14, 2021

Yep the "over-scheduling" is due to the use of ray.wait on a task. Ray releases CPUs when a task is blocked on waiting, so this can lead to some oversubscription of resources.

If this is problematic, one workaround is to use actors instead of tasks. Actors never release their CPUs even when they are blocked on waiting.

@whhu
Copy link
Author

whhu commented Sep 16, 2021

Thank you for the hint! It is very helpful to make sense of the ray runtime.

@stale
Copy link

stale bot commented Jan 14, 2022

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jan 14, 2022
@stale
Copy link

stale bot commented Jan 29, 2022

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

@stale stale bot closed this as completed Jan 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't stale The issue is stale. It will be closed within 7 days unless there are further conversation triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

No branches or pull requests

2 participants