Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Round robin during spread scheduling #19968

Merged
merged 10 commits into from
Dec 23, 2021
3 changes: 2 additions & 1 deletion python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ def validate_redis_address(address):

redis_address_parts = redis_address.split(":")
if len(redis_address_parts) != 2:
raise ValueError("Malformed address. Expected '<host>:<port>'.")
raise ValueError(f"Malformed address. Expected '<host>:<port>',"
f" but got {redis_address} from {address}.")
redis_ip = redis_address_parts[0]
try:
redis_port = int(redis_address_parts[1])
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/impl/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def simple_shuffle(input_blocks: BlockList,
map_ray_remote_args = {}
if reduce_ray_remote_args is None:
reduce_ray_remote_args = {}
if "scheduling_strategy" not in reduce_ray_remote_args:
reduce_ray_remote_args["scheduling_strategy"] = "SPREAD"
input_num_blocks = len(input_blocks)
if _spread_resource_prefix is not None:
# Use given spread resource prefix for round-robin resource-based
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def remote_read(i: int, task: ReadTask) -> MaybeBlockPartition:
# Note that the too many workers warning triggers at 4x subscription,
# so we go at 0.5 to avoid the warning message.
ray_remote_args["num_cpus"] = 0.5
if "scheduling_strategy" not in ray_remote_args:
ray_remote_args["scheduling_strategy"] = "SPREAD"
remote_read = cached_remote_fn(remote_read)

if _spread_resource_prefix is not None:
Expand Down
28 changes: 18 additions & 10 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3615,13 +3615,14 @@ def range(n, parallelism=200):
assert r1 == ds


def test_random_shuffle_spread(ray_start_cluster):
@pytest.mark.parametrize("use_spread_resource_prefix", [False, True])
def test_random_shuffle_spread(ray_start_cluster, use_spread_resource_prefix):
cluster = ray_start_cluster
cluster.add_node(
resources={"foo": 100},
resources={"bar:1": 100},
num_cpus=10,
_system_config={"max_direct_call_object_size": 0})
cluster.add_node(resources={"bar:1": 100})
cluster.add_node(resources={"bar:2": 100})
cluster.add_node(resources={"bar:2": 100}, num_cpus=10)
cluster.add_node(resources={"bar:3": 100}, num_cpus=0)

ray.init(cluster.address)
Expand All @@ -3634,7 +3635,9 @@ def get_node_id():
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())

ds = ray.data.range(
100, parallelism=2).random_shuffle(_spread_resource_prefix="bar:")
100, parallelism=2).random_shuffle(
_spread_resource_prefix=(
"bar:" if use_spread_resource_prefix else None))
blocks = ds.get_internal_block_refs()
ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
location_data = ray.experimental.get_object_locations(blocks)
Expand All @@ -3644,13 +3647,15 @@ def get_node_id():
assert set(locations) == {node1_id, node2_id}


def test_parquet_read_spread(ray_start_cluster, tmp_path):
@pytest.mark.parametrize("use_spread_resource_prefix", [False, True])
def test_parquet_read_spread(ray_start_cluster, tmp_path,
use_spread_resource_prefix):
cluster = ray_start_cluster
cluster.add_node(
resources={"foo": 100},
resources={"bar:1": 100},
num_cpus=10,
_system_config={"max_direct_call_object_size": 0})
cluster.add_node(resources={"bar:1": 100})
cluster.add_node(resources={"bar:2": 100})
cluster.add_node(resources={"bar:2": 100}, num_cpus=10)
cluster.add_node(resources={"bar:3": 100}, num_cpus=0)

ray.init(cluster.address)
Expand All @@ -3673,7 +3678,10 @@ def get_node_id():
path2 = os.path.join(data_path, "test2.parquet")
df2.to_parquet(path2)

ds = ray.data.read_parquet(data_path, _spread_resource_prefix="bar:")
ds = ray.data.read_parquet(
data_path,
_spread_resource_prefix=("bar:"
if use_spread_resource_prefix else None))

# Force reads.
blocks = ds.get_internal_block_refs()
Expand Down
2 changes: 1 addition & 1 deletion python/ray/util/scheduling_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@PublicAPI(stability="beta")
class PlacementGroupSchedulingStrategy(object):
class PlacementGroupSchedulingStrategy:
jjyao marked this conversation as resolved.
Show resolved Hide resolved
"""Placement group based scheduling strategy.

Attributes:
Expand Down
12 changes: 5 additions & 7 deletions release/nightly_tests/dataset/pipelined_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ def create_torch_iterator(split, batch_size, rank=None):
def create_dataset(files, num_workers=4, epochs=50, num_windows=1):
if num_windows > 1:
num_rows = ray.data.read_parquet(
files, _spread_resource_prefix="node:").count(
) # This should only read Parquet metadata.
files).count() # This should only read Parquet metadata.
file_splits = np.array_split(files, num_windows)

class Windower:
Expand All @@ -235,20 +234,19 @@ def __next__(self):
raise StopIteration()
split = file_splits[self.i % num_windows]
self.i += 1
return lambda: ray.data.read_parquet(
list(split), _spread_resource_prefix="node:")
return lambda: ray.data.read_parquet(list(split))

pipe = DatasetPipeline.from_iterable(Windower())
split_indices = [
i * num_rows // num_windows // num_workers
for i in range(1, num_workers)
]
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
pipe_shards = pipe.split_at_indices(split_indices)
else:
ds = ray.data.read_parquet(files, _spread_resource_prefix="node:")
ds = ray.data.read_parquet(files)
pipe = ds.repeat(epochs)
pipe = pipe.random_shuffle_each_window(_spread_resource_prefix="node:")
pipe = pipe.random_shuffle_each_window()
pipe_shards = pipe.split(num_workers, equal=True)
return pipe_shards

Expand Down
47 changes: 21 additions & 26 deletions src/ray/raylet/scheduling/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
const ResourceRequest &resource_request, float spread_threshold, bool force_spillback,
bool require_available, std::function<bool(int64_t)> is_node_available,
NodeFilter node_filter) {
// Step 1: Generate the traversal order. We guarantee that the first node is local, to
// encourage local scheduling. The rest of the traversal order should be globally
// consistent, to encourage using "warm" workers.
std::vector<int64_t> round;
round.reserve(nodes_.size());
const auto local_it = nodes_.find(local_node_id_);
RAY_CHECK(local_it != nodes_.end());
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
auto predicate = [node_filter, &is_node_available](
int64_t node_id, const NodeResources &node_resources) {
if (!is_node_available(node_id)) {
Expand All @@ -63,38 +56,37 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
return !has_gpu;
};

const auto &local_node_view = local_it->second.GetLocalView();
// If we should include local node at all, make sure it is at the front of the list
// so that
// 1. It's first in traversal order.
// 2. It's easy to avoid sorting it.
if (predicate(local_node_id_, local_node_view) && !force_spillback) {
round.push_back(local_node_id_);
}
// Step 1: Generate the traversal order. We guarantee that the first node is local, to
// encourage local scheduling. The rest of the traversal order should be globally
// consistent, to encourage using "warm" workers.
std::vector<int64_t> round;
round.reserve(nodes_.size());
round.emplace_back(local_node_id_);

const auto start_index = round.size();
for (const auto &pair : nodes_) {
if (pair.first != local_node_id_ &&
predicate(pair.first, pair.second.GetLocalView())) {
round.push_back(pair.first);
if (pair.first != local_node_id_) {
round.emplace_back(pair.first);
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
}
}
// Sort all the nodes, making sure that if we added the local node in front, it stays in
// place.
std::sort(round.begin() + start_index, round.end());
// Sort all the nodes, making sure that the local node stays in front.
std::sort(round.begin() + 1, round.end());

int64_t best_node_id = -1;
float best_utilization_score = INFINITY;
bool best_is_available = false;

// Step 2: Perform the round robin.
auto round_it = round.begin();
for (; round_it != round.end(); round_it++) {
const auto &node_id = *round_it;
size_t round_index = spread_threshold == 0 ? spread_scheduling_next_index_ : 0;
for (size_t i = 0; i < round.size(); ++i, ++round_index) {
const auto &node_id = round[round_index % round.size()];
const auto &it = nodes_.find(node_id);
RAY_CHECK(it != nodes_.end());
const auto &node = it->second;
if (!node.GetLocalView().IsFeasible(resource_request)) {
if (node_id == local_node_id_ && force_spillback) {
jjyao marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
if (!predicate(node_id, node.GetLocalView()) ||
!node.GetLocalView().IsFeasible(resource_request)) {
continue;
}

Expand Down Expand Up @@ -138,6 +130,9 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
}

if (update_best_node) {
if (spread_threshold == 0) {
spread_scheduling_next_index_ = ((round_index + 1) % round.size());
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
}
best_node_id = node_id;
best_utilization_score = critical_resource_utilization;
best_is_available = is_available;
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/scheduling/scheduling_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class SchedulingPolicy {
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
const absl::flat_hash_map<int64_t, Node> &nodes_;
// The node to start round robin if it's spread scheduling.
// The index may be inaccurate when nodes are added or removed dynamically,
// but it should still be better than always scanning from 0 for spread scheduling.
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
size_t spread_scheduling_next_index_ = 0;

enum class NodeFilter {
/// Default scheduling.
Expand Down
31 changes: 31 additions & 0 deletions src/ray/raylet/scheduling/scheduling_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,37 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) {
ASSERT_EQ(to_schedule, -1);
}

TEST_F(SchedulingPolicyTest, SpreadSchedulingTest) {
jjyao marked this conversation as resolved.
Show resolved Hide resolved
// Test to make sure we are doing round robin for spread scheduling.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;

absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
nodes.emplace(remote_node_1, CreateNodeResources(20, 20, 0, 0, 0, 0));
nodes.emplace(remote_node_2, CreateNodeResources(20, 20, 0, 0, 0, 0));

raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes);

int64_t to_schedule =
scheduling_policy.HybridPolicy(req, 0, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);

nodes.emplace(local_node, CreateNodeResources(19, 20, 0, 0, 0, 0));
to_schedule =
scheduling_policy.HybridPolicy(req, 0, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node_1);

// Make sure we don't always scan from beginning for spread scheduling.
nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
to_schedule =
scheduling_policy.HybridPolicy(req, 0, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node_2);
}

TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) {
// Prefer to schedule on CPU nodes first.
// GPU nodes should be preferred as a last resort.
Expand Down