Skip to content

Commit

Permalink
[core] Spill at least the object fusion size instead of at most (#22750)
Browse files Browse the repository at this point in the history
Copied from #22571:

Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues. However, currently we spill at most the object fusion size when instead we should be spilling at least the fusion size. Then we use the max number of fused objects as a cap.

This PR fixes the fusion behavior so that we always spill at minimum the fusion size. If we reach the end of the spillable objects, and we are under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.

Increases some test timeouts to allow tests to pass.
  • Loading branch information
stephanie-wang authored Apr 5, 2022
1 parent ca6dfc8 commit 1c972d5
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 99 deletions.
10 changes: 9 additions & 1 deletion python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,15 @@ def ok():
if "Restored" in s:
return False
if spilled:
if "Spilled {} MiB".format(spilled) not in s:
if not isinstance(spilled, list):
spilled_lst = [spilled]
else:
spilled_lst = spilled
found = False
for n in spilled_lst:
if "Spilled {} MiB".format(n) in s:
found = True
if not found:
return False
else:
if "Spilled" in s:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/internal/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_store_stats(state, node_manager_address=None, node_manager_port=None):
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.FormatGlobalMemoryInfo(
node_manager_pb2.FormatGlobalMemoryInfoRequest(include_memory_info=False),
timeout=30.0,
timeout=60.0,
)
return store_stats_summary(reply)

Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ py_test_module_list(
"test_output.py",
"test_failure_4.py",
"test_object_spilling.py",
"test_object_spilling_no_asan.py",
"test_object_spilling_2.py",
"test_object_spilling_3.py",
"test_plasma_unlimited.py",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ def depends(arg):


@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows.")
def test_spill_objects_automatically(object_spilling_config, shutdown_only):
def test_spill_objects_automatically(fs_only_object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
object_spilling_config, _ = fs_only_object_spilling_config
address = ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
Expand Down
50 changes: 50 additions & 0 deletions python/ray/tests/test_object_spilling_no_asan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import numpy as np
import platform
import pytest
import os
import sys

import ray


# NOTE(swang): This test currently fails in ASAN mode because it tests a
# performance issue that is likely sensitive to timing.
@pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows.")
def test_spill_fusion(fs_only_object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = fs_only_object_spilling_config
min_spilling_size = 10 * 1024 * 1024
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
"object_spilling_config": object_spilling_config,
"min_spilling_size": min_spilling_size,
"object_spilling_threshold": 0.8,
# Set the timeout between create retries high so that this test
# passes in ASAN and debug mode.
"object_store_full_delay_ms": 1000,
},
)

object_size = 1024 * 1024
# Fill up the object store 4 times with small objects.
# We trigger spilling at 80% and the min spill size is
# about 10 objects.
xs = [ray.put(np.zeros(object_size // 8)) for _ in range(300)] # noqa: F841

spill_dir = os.path.join(temp_folder, ray.ray_constants.DEFAULT_OBJECT_PREFIX)
under_min, over_min = 0, 0
for filename in os.listdir(spill_dir):
size = os.stat(os.path.join(spill_dir, filename)).st_size
if size < 2 * object_size // 8:
under_min += 1
else:
over_min += 1
# We should almost always spill fused objects.
assert over_min > under_min


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion python/ray/tests/test_plasma_unlimited.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_plasma_allocate(shutdown_only):
__ = ray.put(data) # noqa

# Check fourth object allocate in memory.
check_spilled_mb(address, spilled=180)
check_spilled_mb(address, spilled=[90, 180])


if __name__ == "__main__":
Expand Down
106 changes: 58 additions & 48 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,64 +181,74 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
auto it = pinned_objects_.begin();
std::vector<ObjectID> objects_to_spill;
int64_t counts = 0;
while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end() &&
counts < max_fused_object_count_) {
while (it != pinned_objects_.end() && counts < max_fused_object_count_) {
if (is_plasma_object_spillable_(it->first)) {
bytes_to_spill += it->second->GetSize();
objects_to_spill.push_back(it->first);
}
it++;
counts += 1;
}
if (!objects_to_spill.empty()) {
RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill
<< " num objects " << objects_to_spill.size();
auto start_time = absl::GetCurrentTimeNanos();
SpillObjectsInternal(
objects_to_spill,
[this, bytes_to_spill, objects_to_spill, start_time](const Status &status) {
if (!status.ok()) {
RAY_LOG(DEBUG) << "Failed to spill objects: " << status.ToString();
} else {
auto now = absl::GetCurrentTimeNanos();
RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in "
<< (now - start_time) / 1e6 << "ms";
spilled_bytes_total_ += bytes_to_spill;
spilled_objects_total_ += objects_to_spill.size();
// Adjust throughput timing to account for concurrent spill operations.
spill_time_total_s_ +=
(now - std::max(start_time, last_spill_finish_ns_)) / 1e9;
if (now - last_spill_log_ns_ > 1e9) {
last_spill_log_ns_ = now;
std::stringstream msg;
// Keep :info_message: in sync with LOG_PREFIX_INFO_MESSAGE in
// ray_constants.py.
msg << ":info_message:Spilled "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024)) << " MiB, "
<< spilled_objects_total_ << " objects, write throughput "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024) /
spill_time_total_s_)
<< " MiB/s.";
if (next_spill_error_log_bytes_ > 0 &&
spilled_bytes_total_ >= next_spill_error_log_bytes_) {
// Add an advisory the first time this is logged.
if (next_spill_error_log_bytes_ ==
RayConfig::instance().verbose_spill_logs()) {
msg << " Set RAY_verbose_spill_logs=0 to disable this message.";
}
// Exponential backoff on the spill messages.
next_spill_error_log_bytes_ *= 2;
RAY_LOG(ERROR) << msg.str();
} else {
RAY_LOG(INFO) << msg.str();
if (objects_to_spill.empty()) {
return false;
}

if (it == pinned_objects_.end() && bytes_to_spill < num_bytes_to_spill &&
!objects_pending_spill_.empty()) {
// We have gone through all spillable objects but we have not yet reached
// the minimum bytes to spill and we are already spilling other objects.
// Let those spill requests finish before we try to spill the current
// objects. This gives us some time to decide whether we really need to
// spill the current objects or if we can afford to wait for additional
// objects to fuse with.
return false;
}
RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill << " num objects "
<< objects_to_spill.size();
auto start_time = absl::GetCurrentTimeNanos();
SpillObjectsInternal(
objects_to_spill,
[this, bytes_to_spill, objects_to_spill, start_time](const Status &status) {
if (!status.ok()) {
RAY_LOG(DEBUG) << "Failed to spill objects: " << status.ToString();
} else {
auto now = absl::GetCurrentTimeNanos();
RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in "
<< (now - start_time) / 1e6 << "ms";
spilled_bytes_total_ += bytes_to_spill;
spilled_objects_total_ += objects_to_spill.size();
// Adjust throughput timing to account for concurrent spill operations.
spill_time_total_s_ +=
(now - std::max(start_time, last_spill_finish_ns_)) / 1e9;
if (now - last_spill_log_ns_ > 1e9) {
last_spill_log_ns_ = now;
std::stringstream msg;
// Keep :info_message: in sync with LOG_PREFIX_INFO_MESSAGE in
// ray_constants.py.
msg << ":info_message:Spilled "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024)) << " MiB, "
<< spilled_objects_total_ << " objects, write throughput "
<< static_cast<int>(spilled_bytes_total_ / (1024 * 1024) /
spill_time_total_s_)
<< " MiB/s.";
if (next_spill_error_log_bytes_ > 0 &&
spilled_bytes_total_ >= next_spill_error_log_bytes_) {
// Add an advisory the first time this is logged.
if (next_spill_error_log_bytes_ ==
RayConfig::instance().verbose_spill_logs()) {
msg << " Set RAY_verbose_spill_logs=0 to disable this message.";
}
// Exponential backoff on the spill messages.
next_spill_error_log_bytes_ *= 2;
RAY_LOG(ERROR) << msg.str();
} else {
RAY_LOG(INFO) << msg.str();
}
last_spill_finish_ns_ = now;
}
});
return true;
}
return false;
last_spill_finish_ns_ = now;
}
});
return true;
}

void LocalObjectManager::SpillObjects(const std::vector<ObjectID> &object_ids,
Expand Down
13 changes: 9 additions & 4 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ class LocalObjectManager {
std::string DebugString() const;

private:
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSize);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSizeZero);
FRIEND_TEST(LocalObjectManagerTest, TestSpillUptoMaxFuseCount);
FRIEND_TEST(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectNotEvictable);

/// Asynchronously spill objects when space is needed.
/// The callback tries to spill objects as much as num_bytes_to_spill and returns
/// true if we could spill the corresponding bytes.
/// Asynchronously spill objects when space is needed. The callback tries to
/// spill at least num_bytes_to_spill and returns true if we found objects to
/// spill.
/// If num_bytes_to_spill many objects cannot be found and there are other
/// objects already being spilled, this will return false to give the
/// currently spilling objects time to finish.
/// NOTE(sang): If 0 is given, this method spills a single object.
///
/// \param num_bytes_to_spill The total number of bytes to spill.
Expand Down Expand Up @@ -336,7 +339,9 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;

friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
};

}; // namespace raylet
Expand Down
Loading

0 comments on commit 1c972d5

Please sign in to comment.