Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No spreading if a node is selected for lease request due to locality #22015

Merged
merged 7 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions python/ray/tests/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,36 @@ def f(x):
ray.get(f.remote(dep), timeout=30)


def test_spread_scheduling_overrides_locality_aware_scheduling(ray_start_cluster):
# This test ensures that explicit spread scheduling strategy has higher
# priority than locality aware scheduling which means the lease request
# will be sent to local raylet instead of locality favored raylet.
cluster = ray_start_cluster
local_node = cluster.add_node(
num_cpus=8,
_system_config={
"worker_lease_timeout_milliseconds": 0,
"max_direct_call_object_size": 0,
"locality_aware_leasing_enabled": True,
},
)
ray.init(address=cluster.address)
cluster.add_node(num_cpus=8, resources={"pin": 1})
cluster.wait_for_nodes()

@ray.remote(resources={"pin": 1})
def non_local():
return ray.worker.global_worker.node.unique_id

@ray.remote(scheduling_strategy="SPREAD")
def f(x):
return ray.worker.global_worker.node.unique_id

# Test that task f() runs on the local node
# even though non local node has the dependencies.
assert ray.get(f.remote(non_local.remote())) == local_node.unique_id


def test_locality_aware_leasing(ray_start_cluster):
# This test ensures that a task will run where its task dependencies are
# located. We run an initial non_local() task that is pinned to a
Expand All @@ -275,13 +305,21 @@ def test_locality_aware_leasing(ray_start_cluster):
_system_config={
"worker_lease_timeout_milliseconds": 0,
"max_direct_call_object_size": 0,
# Needed because the above test sets this to False.
"locality_aware_leasing_enabled": True,
"scheduler_spread_threshold": 0.1,
},
)
# Use a custom resource for pinning tasks to a node.
non_local_node = cluster.add_node(num_cpus=1, resources={"pin": 1})
ray.init(address=cluster.address)
# Use a custom resource for pinning tasks to a node.
non_local_node = cluster.add_node(num_cpus=2, resources={"pin": 2})
cluster.wait_for_nodes()

@ray.remote(num_cpus=1, resources={"pin": 1})
class Actor:
def ping(self):
pass

actor = Actor.remote()
ray.get(actor.ping.remote())
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved

@ray.remote(resources={"pin": 1})
def non_local():
Expand All @@ -291,7 +329,8 @@ def non_local():
def f(x):
return ray.worker.global_worker.node.unique_id

# Test that task f() runs on the same node as non_local().
# Test that task f() runs on the same node as non_local()
# even though local node is lower critical resource utilization.
assert ray.get(f.remote(non_local.remote())) == non_local_node.unique_id


Expand Down
4 changes: 2 additions & 2 deletions src/mock/ray/core_worker/lease_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ namespace core {

class MockLeasePolicyInterface : public LeasePolicyInterface {
public:
MOCK_METHOD(rpc::Address, GetBestNodeForTask, (const TaskSpecification &spec),
(override));
MOCK_METHOD((std::pair<rpc::Address, bool>), GetBestNodeForTask,
(const TaskSpecification &spec), (override));
};

} // namespace core
Expand Down
8 changes: 4 additions & 4 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ class MockWorkerLeaseInterface : public WorkerLeaseInterface {
void, RequestWorkerLease,
(const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
const int64_t backlog_size, const bool is_selected_based_on_locality),
(override));
MOCK_METHOD(
void, RequestWorkerLease,
(const rpc::TaskSpec &task_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
const int64_t backlog_size, const bool is_selected_based_on_locality),
(override));
MOCK_METHOD(ray::Status, ReturnWorker,
(int worker_port, const WorkerID &worker_id, bool disconnect_worker),
Expand Down Expand Up @@ -124,13 +124,13 @@ class MockRayletClientInterface : public RayletClientInterface {
void, RequestWorkerLease,
(const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
const int64_t backlog_size, const bool is_selected_based_on_locality),
(override));
MOCK_METHOD(
void, RequestWorkerLease,
(const rpc::TaskSpec &resource_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size),
const int64_t backlog_size, const bool is_selected_based_on_locality),
(override));

MOCK_METHOD(ray::Status, ReturnWorker,
Expand Down
19 changes: 14 additions & 5 deletions src/ray/core_worker/lease_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
namespace ray {
namespace core {

rpc::Address LocalityAwareLeasePolicy::GetBestNodeForTask(const TaskSpecification &spec) {
std::pair<rpc::Address, bool> LocalityAwareLeasePolicy::GetBestNodeForTask(
const TaskSpecification &spec) {
if (spec.GetMessage().scheduling_strategy().scheduling_strategy_case() !=
rpc::SchedulingStrategy::SchedulingStrategyCase::kDefaultSchedulingStrategy) {
// The explicit scheduling strategy
// has higher priority than locality aware scheduling.
return std::make_pair(fallback_rpc_address_, false);
}

if (auto node_id = GetBestNodeIdForTask(spec)) {
if (auto addr = node_addr_factory_(node_id.value())) {
return addr.value();
return std::make_pair(addr.value(), true);
}
}
return fallback_rpc_address_;
return std::make_pair(fallback_rpc_address_, false);
}

/// Criteria for "best" node: The node with the most object bytes (from object_ids) local.
Expand Down Expand Up @@ -54,9 +62,10 @@ absl::optional<NodeID> LocalityAwareLeasePolicy::GetBestNodeIdForTask(
return max_bytes_node;
}

rpc::Address LocalLeasePolicy::GetBestNodeForTask(const TaskSpecification &spec) {
std::pair<rpc::Address, bool> LocalLeasePolicy::GetBestNodeForTask(
const TaskSpecification &spec) {
// Always return the local node.
return local_node_rpc_address_;
return std::make_pair(local_node_rpc_address_, false);
}

} // namespace core
Expand Down
9 changes: 6 additions & 3 deletions src/ray/core_worker/lease_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class LocalityDataProviderInterface {
class LeasePolicyInterface {
public:
/// Get the address of the best worker node for a lease request for the provided task.
virtual rpc::Address GetBestNodeForTask(const TaskSpecification &spec) = 0;
virtual std::pair<rpc::Address, bool> GetBestNodeForTask(
const TaskSpecification &spec) = 0;

virtual ~LeasePolicyInterface() {}
};
Expand All @@ -63,7 +64,8 @@ class LocalityAwareLeasePolicy : public LeasePolicyInterface {
~LocalityAwareLeasePolicy() {}

/// Get the address of the best worker node for a lease request for the provided task.
rpc::Address GetBestNodeForTask(const TaskSpecification &spec);
std::pair<rpc::Address, bool> GetBestNodeForTask(
const TaskSpecification &spec) override;

private:
/// Get the best worker node for a lease request for the provided task.
Expand All @@ -89,7 +91,8 @@ class LocalLeasePolicy : public LeasePolicyInterface {
~LocalLeasePolicy() {}

/// Get the address of the local node for a lease request for the provided task.
rpc::Address GetBestNodeForTask(const TaskSpecification &spec);
std::pair<rpc::Address, bool> GetBestNodeForTask(
const TaskSpecification &spec) override;

private:
/// RPC address of the local node.
Expand Down
15 changes: 6 additions & 9 deletions src/ray/core_worker/test/direct_task_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,15 @@ class MockRayletClient : public WorkerLeaseInterface {
void RequestWorkerLease(
const TaskSpecification &resource_spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size) override {
num_workers_requested += 1;
if (grant_or_reject) {
num_grant_or_reject_leases_requested += 1;
}
callbacks.push_back(callback);
const int64_t backlog_size, const bool is_selected_based_on_locality) override {
RequestWorkerLease(resource_spec.GetMessage(), grant_or_reject, callback,
backlog_size, is_selected_based_on_locality);
}

void RequestWorkerLease(
const rpc::TaskSpec &task_spec, bool grant_or_reject,
const ray::rpc::ClientCallback<ray::rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override {
const int64_t backlog_size, const bool is_selected_based_on_locality) override {
num_workers_requested += 1;
callbacks.push_back(callback);
}
Expand Down Expand Up @@ -303,9 +300,9 @@ class MockLeasePolicy : public LeasePolicyInterface {
fallback_rpc_address_.set_raylet_id(node_id.Binary());
}

rpc::Address GetBestNodeForTask(const TaskSpecification &spec) {
std::pair<rpc::Address, bool> GetBestNodeForTask(const TaskSpecification &spec) {
num_lease_policy_consults++;
return fallback_rpc_address_;
return std::make_pair(fallback_rpc_address_, false);
};

~MockLeasePolicy() {}
Expand Down
31 changes: 24 additions & 7 deletions src/ray/core_worker/test/lease_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ TaskSpecification CreateFakeTask(std::vector<ObjectID> deps) {
spec.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(
dep.Binary());
}
spec.GetMutableMessage()
.mutable_scheduling_strategy()
->mutable_default_scheduling_strategy();
return spec;
}

Expand Down Expand Up @@ -67,9 +70,11 @@ TEST(LocalLeasePolicyTest, TestReturnFallback) {
ObjectID obj2 = ObjectID::FromRandom();
std::vector<ObjectID> deps{obj1, obj2};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = local_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
local_lease_policy.GetBestNodeForTask(task_spec);
// Test that fallback node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), fallback_node);
ASSERT_FALSE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityDominatingNode) {
Expand All @@ -88,11 +93,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityDominatingNode) {
mock_locality_data_provider, MockNodeAddrFactory, fallback_rpc_address);
std::vector<ObjectID> deps{obj1, obj2};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that best node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), best_node);
ASSERT_TRUE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityBiggerObject) {
Expand All @@ -112,11 +119,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityBiggerObject) {
mock_locality_data_provider, MockNodeAddrFactory, fallback_rpc_address);
std::vector<ObjectID> deps{obj1, obj2};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that best node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), best_node);
ASSERT_TRUE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityBetterNode) {
Expand All @@ -140,11 +149,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityBetterNode) {
mock_locality_data_provider, MockNodeAddrFactory, fallback_rpc_address);
std::vector<ObjectID> deps{obj1, obj2, obj3};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that best node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), best_node);
ASSERT_TRUE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackNoLocations) {
Expand All @@ -162,11 +173,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackNoLocations) {
mock_locality_data_provider, MockNodeAddrFactory, fallback_rpc_address);
std::vector<ObjectID> deps{obj1, obj2};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that fallback node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), fallback_node);
ASSERT_FALSE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackNoDeps) {
Expand All @@ -179,11 +192,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackNoDeps) {
// No task dependencies.
std::vector<ObjectID> deps;
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that fallback node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), fallback_node);
ASSERT_FALSE(is_selected_based_on_locality);
}

TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackAddrFetchFail) {
Expand All @@ -202,11 +217,13 @@ TEST(LocalityAwareLeasePolicyTest, TestBestLocalityFallbackAddrFetchFail) {
mock_locality_data_provider, MockNodeAddrFactoryAlwaysNull, fallback_rpc_address);
std::vector<ObjectID> deps{obj1, obj2};
auto task_spec = CreateFakeTask(deps);
rpc::Address best_node_address = locality_lease_policy.GetBestNodeForTask(task_spec);
auto [best_node_address, is_selected_based_on_locality] =
locality_lease_policy.GetBestNodeForTask(task_spec);
// Locality data provider should be called once for each dependency.
ASSERT_EQ(mock_locality_data_provider->num_locality_data_fetches, deps.size());
// Test that fallback node was chosen.
ASSERT_EQ(NodeID::FromBinary(best_node_address.raylet_id()), fallback_node);
ASSERT_FALSE(is_selected_based_on_locality);
}

} // namespace core
Expand Down
6 changes: 4 additions & 2 deletions src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,11 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);
rpc::Address best_node_address;
const bool is_spillback = (raylet_address != nullptr);
bool is_selected_based_on_locality = false;
if (raylet_address == nullptr) {
// If no raylet address is given, find the best worker for our next lease request.
best_node_address = lease_policy_->GetBestNodeForTask(resource_spec);
std::tie(best_node_address, is_selected_based_on_locality) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only run this logic if the scheduling strategy is DEFAULT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently that check is inside lease_policy: it checks the scheduling strategy and decide if it needs to run locality logic.

lease_policy_->GetBestNodeForTask(resource_spec);
raylet_address = &best_node_address;
}

Expand Down Expand Up @@ -475,7 +477,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
}
}
},
task_queue.size());
task_queue.size(), is_selected_based_on_locality);
scheduling_key_entry.pending_lease_requests.emplace(task_id, *raylet_address);
ReportWorkerBacklogIfNeeded(scheduling_key);
}
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ struct GcsServerMocker {
void RequestWorkerLease(
const ray::TaskSpecification &resource_spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override {
const int64_t backlog_size, const bool is_selected_based_on_locality) override {
num_workers_requested += 1;
callbacks.push_back(callback);
}

void RequestWorkerLease(
const rpc::TaskSpec &spec, bool grant_or_reject,
const rpc::ClientCallback<rpc::RequestWorkerLeaseReply> &callback,
const int64_t backlog_size = -1) override {
const int64_t backlog_size, const bool is_selected_based_on_locality) override {
num_workers_requested += 1;
callbacks.push_back(callback);
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ message RequestWorkerLeaseRequest {
// locally schedulable or reject the request.
// Else, the raylet may return another raylet at which to retry the request.
bool grant_or_reject = 3;
// If it's true, then the current raylet is selected due to the locality of task arguments.
bool is_selected_based_on_locality = 4;
}

message RequestWorkerLeaseReply {
Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1609,8 +1609,9 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
send_reply_callback(status, success, failure);
};

cluster_task_manager_->QueueAndScheduleTask(task, request.grant_or_reject(), reply,
send_reply_callback_wrapper);
cluster_task_manager_->QueueAndScheduleTask(task, request.grant_or_reject(),
request.is_selected_based_on_locality(),
reply, send_reply_callback_wrapper);
}

void NodeManager::HandlePrepareBundleResources(
Expand Down
Loading