Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Force tasks to be scheduled only on local node #5722

Closed
karstenkrispin opened this issue Sep 17, 2019 · 8 comments
Closed

Question: Force tasks to be scheduled only on local node #5722

karstenkrispin opened this issue Sep 17, 2019 · 8 comments
Labels
question Just a question :) stale The issue is stale. It will be closed within 7 days unless there are further conversation

Comments

@karstenkrispin
Copy link

Describe the problem

I would like to use ray for parallized execution, especially to avoid multiprocessing.

Eg, when I open a list of compressed images I would like to run this concurrently. However, it wouldn't make too much sense to delegate this task to workers not on the same node.

Is there a way to force scheduling only on the local node?

@virtualluke
Copy link
Contributor

We have had the need for workers with a critical need of data locality similar to what you describe.

We have addressed this kind of thing by launching an actor on each node and that actor has a list of work specific to that node (ie the data is on that node). Each worker task checks what node it is on and asks for work from the actor that resides on its node.

Does that make sense? I can elaborate more if needed.

@richardliaw
Copy link
Contributor

RLlib has a helper function called "create_colocated" to launch actors on the same node; you could do the same for your application.

@robertnishihara
Copy link
Collaborator

This should be doable with custom resources. See https://ray.readthedocs.io/en/latest/configure.html#cluster-resources. Does this work for your use case?

@karstenkrispin
Copy link
Author

karstenkrispin commented Sep 18, 2019

Hey all,
many thanks for your answers!

@virtualluke
Let me understand if the actor approach is useable in my case: If I have data that is static but should be kept e.g. in the local plasma store, then an actor per node approach totally makes sense to me. However, If the data needs to be processed on the fly but parallelized, the actor would need to schedule tasks again, right? These tasks would be scheduled again on the whole cluster. From what I understand, the local actor approach wouldn't work in that case, either.

@robertnishihara
The way I understand how this could work is providing each node a unique resource and then calling remote functions with this resource required?
How can I specify this resources on the fly and differently on each node? If I have

@ray.remote
def my_task():
    pass

Is my_task._remote(resource={get_node_resource_name():1}) the way to go? I only saw this function in the context of actors.

So yes, in principle this would work.

I think in the long run it would be cool to have some sort of flag in ray.remote() that allows to specifiy whether the actor should only be run node-local. Something like

@ray.remote(node_local=True)
def my_function():
    return 1

This approach would not require to start each node differently with a different special ressource attached. In terms of auto-scalability it would be easier. Does this make sense?

@richardliaw richardliaw added the question Just a question :) label Oct 7, 2019
@oliver-batchelor
Copy link

oliver-batchelor commented Sep 19, 2020

Also very interested in this question (the most effective way to do this) - seems relevant particularly for use cases with a slow ethernet and large data transfer, it is hard to get good performance without forcing local execution on parts of a system.

@richardliaw
Copy link
Contributor

richardliaw commented Sep 20, 2020

Here's a walkthrough of how one might do this:

# Define a remote func
@ray.remote
def remote_func(args):
    print("hi")

# Obtain the local ip address
local_hostname = ray.services.get_node_ip_address()
assert isinstance(local_hostname, str)

# Ray has a predefined "node id" resource for locality placement
node_id = f"node:{local_hostname}"
# Check to make sure the node id resource exists
assert node_id in ray.cluster_resources()

# Create a remote function with the given resource label attached
local_remote_func = remote_func.options(resources={node_id: 0.01})

# Invoke the remote function
ray.get(local_remote_func.remote())
# prints "hi" from the local node

@stale
Copy link

stale bot commented Jan 18, 2021

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jan 18, 2021
@stale
Copy link

stale bot commented Feb 1, 2021

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Just a question :) stale The issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Development

No branches or pull requests

5 participants