Skip to content

Commit

Permalink
[core] Fix bug in fusion for spilled objects (ray-project#22571)
Browse files Browse the repository at this point in the history
Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues.

However, the current logic always spills once we reach the end of the spillable objects or once we've reached the fusion threshold. This can produce lots of unfused objects if they are created concurrently with the spill.

This PR changes the spill logic: once we reach the end of the spillable objects, if the last batch of spilled objects is under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.
  • Loading branch information
stephanie-wang authored and simonsays1980 committed Feb 27, 2022
1 parent ad41236 commit a370099
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 3 deletions.
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ py_test_module_list(
"test_worker_capping.py",
"test_object_spilling_2.py",
"test_object_spilling_3.py",
"test_object_spilling_no_asan.py",
"test_object_manager.py",
"test_multi_tenancy.py",
"test_scheduling.py",
Expand Down
48 changes: 48 additions & 0 deletions python/ray/tests/test_object_spilling_no_asan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import numpy as np
import pytest
import os
import sys

import ray


# NOTE(swang): This test currently fails in ASAN mode because it tests a
# performance issue that is likely sensitive to timing.
def test_spill_fusion(object_spilling_config):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
min_spilling_size = 10 * 1024 * 1024
ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
"object_spilling_config": object_spilling_config,
"min_spilling_size": min_spilling_size,
"object_spilling_threshold": 0.8,
# Set the timeout between create retries high so that this test
# passes in ASAN and debug mode.
"object_store_full_delay_ms": 1000,
},
)

object_size = 1024 * 1024
# Fill up the object store 4 times with small objects.
# We trigger spilling at 80% and the min spill size is
# about 10 objects.
xs = [ray.put(np.zeros(object_size // 8)) for _ in range(300)] # noqa: F841

spill_dir = os.path.join(temp_folder, ray.ray_constants.DEFAULT_OBJECT_PREFIX)
under_min, over_min = 0, 0
for filename in os.listdir(spill_dir):
size = os.stat(os.path.join(spill_dir, filename)).st_size
if size < min_spilling_size:
under_min += 1
else:
over_min += 1
# We should almost always spill fused objects.
assert over_min > under_min


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
10 changes: 10 additions & 0 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
counts += 1;
}
if (!objects_to_spill.empty()) {
if (it == pinned_objects_.end() && bytes_to_spill <= num_bytes_to_spill &&
!objects_pending_spill_.empty()) {
// We have gone through all spillable objects but we have not yet reached
// the minimum bytes to spill and we are already spilling other objects.
// Let those spill requests finish before we try to spill the current
// objects. This gives us some time to decide whether we really need to
// spill the current objects or if we can afford to wait for additional
// objects to fuse with.
return false;
}
RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill
<< " num objects " << objects_to_spill.size();
auto start_time = absl::GetCurrentTimeNanos();
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,9 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;

friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
};

}; // namespace raylet
Expand Down
123 changes: 120 additions & 3 deletions src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ class MockObjectBuffer : public Buffer {
std::shared_ptr<absl::flat_hash_map<ObjectID, int>> unpins_;
};

class LocalObjectManagerTest : public ::testing::Test {
class LocalObjectManagerTestWithMinSpillingSize {
public:
LocalObjectManagerTest()
LocalObjectManagerTestWithMinSpillingSize(int64_t min_spilling_size)
: subscriber_(std::make_shared<MockSubscriber>()),
owner_client(std::make_shared<MockWorkerClient>()),
client_pool([&](const rpc::Address &addr) { return owner_client; }),
Expand All @@ -295,7 +295,7 @@ class LocalObjectManagerTest : public ::testing::Test {
manager_node_id_, "address", 1234, free_objects_batch_size,
/*free_objects_period_ms=*/1000, worker_pool, client_pool,
/*max_io_workers=*/2,
/*min_spilling_size=*/0,
/*min_spilling_size=*/min_spilling_size,
/*is_external_storage_type_fs=*/true,
/*max_fused_object_count*/ max_fused_object_count_,
/*on_objects_freed=*/
Expand Down Expand Up @@ -349,6 +349,18 @@ class LocalObjectManagerTest : public ::testing::Test {
std::unordered_set<ObjectID> unevictable_objects_;
};

class LocalObjectManagerTest : public LocalObjectManagerTestWithMinSpillingSize,
public ::testing::Test {
public:
LocalObjectManagerTest() : LocalObjectManagerTestWithMinSpillingSize(0) {}
};

class LocalObjectManagerFusedTest : public LocalObjectManagerTestWithMinSpillingSize,
public ::testing::Test {
public:
LocalObjectManagerFusedTest() : LocalObjectManagerTestWithMinSpillingSize(100) {}
};

TEST_F(LocalObjectManagerTest, TestPin) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
Expand Down Expand Up @@ -1288,6 +1300,111 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
AssertNoLeaks();
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());

std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 52;

for (size_t i = 0; i < 3; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
auto object = std::make_unique<RayObject>(data_buffer, nullptr,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjectUptoMaxThroughput();
// Only 2 of the objects should be spilled.
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
manager.SpillObjectUptoMaxThroughput();
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());

// Check that half the objects get spilled and the URLs get added to the
// global object directory.
std::vector<std::string> urls;
urls.push_back(BuildURL("url1"));
urls.push_back(BuildURL("url2"));
EXPECT_CALL(worker_pool, PushSpillWorker(_));
// Objects should get freed even though we didn't wait for the owner's notice
// to evict.
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < urls.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}
ASSERT_EQ(owner_client->object_urls.size(), 2);
int num_unpinned = 0;
for (const auto &id : object_ids) {
if ((*unpins)[id] == 1) {
num_unpinned++;
}
}
ASSERT_EQ(num_unpinned, 2);

// We will spill the last object, even though we're under the min spilling
// size, because they are the only spillable objects.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());

std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
// 20 of these objects are needed to hit the min spilling size, but
// max_fused_object_count=15.
int64_t object_size = 5;

for (size_t i = 0; i < 40; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
auto object = std::make_unique<RayObject>(data_buffer, nullptr,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjectUptoMaxThroughput();
// First two spill batches succeed because they have at least 15 objects.
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
// Last spill batch fails because we have 10 objects and their total size is
// less than 100.
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());

std::vector<std::string> urls;
for (int i = 0; i < 15; i++) {
urls.push_back(BuildURL("url", i));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(2);
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < urls.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}

// We will spill the last objects even though we're under the min spilling
// size because they are the only spillable objects.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
}

} // namespace raylet

} // namespace ray
Expand Down

0 comments on commit a370099

Please sign in to comment.