Skip to content

Commit

Permalink
Revert "[core] Fix bug in fusion for spilled objects (#22571)" (#22694)
Browse files Browse the repository at this point in the history
Makes 2 tests flaky
  • Loading branch information
rkooo567 authored Feb 28, 2022
1 parent e84e967 commit 08374e8
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 181 deletions.
1 change: 0 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ py_test_module_list(
"test_worker_capping.py",
"test_object_spilling_2.py",
"test_object_spilling_3.py",
"test_object_spilling_no_asan.py",
"test_object_manager.py",
"test_multi_tenancy.py",
"test_scheduling.py",
Expand Down
48 changes: 0 additions & 48 deletions python/ray/tests/test_object_spilling_no_asan.py

This file was deleted.

10 changes: 0 additions & 10 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,6 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
counts += 1;
}
if (!objects_to_spill.empty()) {
if (it == pinned_objects_.end() && bytes_to_spill <= num_bytes_to_spill &&
!objects_pending_spill_.empty()) {
// We have gone through all spillable objects but we have not yet reached
// the minimum bytes to spill and we are already spilling other objects.
// Let those spill requests finish before we try to spill the current
// objects. This gives us some time to decide whether we really need to
// spill the current objects or if we can afford to wait for additional
// objects to fuse with.
return false;
}
RAY_LOG(DEBUG) << "Spilling objects of total size " << bytes_to_spill
<< " num objects " << objects_to_spill.size();
auto start_time = absl::GetCurrentTimeNanos();
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,7 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;

friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
};

}; // namespace raylet
Expand Down
123 changes: 3 additions & 120 deletions src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ class MockObjectBuffer : public Buffer {
std::shared_ptr<absl::flat_hash_map<ObjectID, int>> unpins_;
};

class LocalObjectManagerTestWithMinSpillingSize {
class LocalObjectManagerTest : public ::testing::Test {
public:
LocalObjectManagerTestWithMinSpillingSize(int64_t min_spilling_size)
LocalObjectManagerTest()
: subscriber_(std::make_shared<MockSubscriber>()),
owner_client(std::make_shared<MockWorkerClient>()),
client_pool([&](const rpc::Address &addr) { return owner_client; }),
Expand All @@ -295,7 +295,7 @@ class LocalObjectManagerTestWithMinSpillingSize {
manager_node_id_, "address", 1234, free_objects_batch_size,
/*free_objects_period_ms=*/1000, worker_pool, client_pool,
/*max_io_workers=*/2,
/*min_spilling_size=*/min_spilling_size,
/*min_spilling_size=*/0,
/*is_external_storage_type_fs=*/true,
/*max_fused_object_count*/ max_fused_object_count_,
/*on_objects_freed=*/
Expand Down Expand Up @@ -349,18 +349,6 @@ class LocalObjectManagerTestWithMinSpillingSize {
std::unordered_set<ObjectID> unevictable_objects_;
};

class LocalObjectManagerTest : public LocalObjectManagerTestWithMinSpillingSize,
public ::testing::Test {
public:
LocalObjectManagerTest() : LocalObjectManagerTestWithMinSpillingSize(0) {}
};

class LocalObjectManagerFusedTest : public LocalObjectManagerTestWithMinSpillingSize,
public ::testing::Test {
public:
LocalObjectManagerFusedTest() : LocalObjectManagerTestWithMinSpillingSize(100) {}
};

TEST_F(LocalObjectManagerTest, TestPin) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
Expand Down Expand Up @@ -1300,111 +1288,6 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
AssertNoLeaks();
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());

std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
int64_t object_size = 52;

for (size_t i = 0; i < 3; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
auto object = std::make_unique<RayObject>(data_buffer, nullptr,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjectUptoMaxThroughput();
// Only 2 of the objects should be spilled.
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
manager.SpillObjectUptoMaxThroughput();
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());

// Check that half the objects get spilled and the URLs get added to the
// global object directory.
std::vector<std::string> urls;
urls.push_back(BuildURL("url1"));
urls.push_back(BuildURL("url2"));
EXPECT_CALL(worker_pool, PushSpillWorker(_));
// Objects should get freed even though we didn't wait for the owner's notice
// to evict.
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < urls.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}
ASSERT_EQ(owner_client->object_urls.size(), 2);
int num_unpinned = 0;
for (const auto &id : object_ids) {
if ((*unpins)[id] == 1) {
num_unpinned++;
}
}
ASSERT_EQ(num_unpinned, 2);

// We will spill the last object, even though we're under the min spilling
// size, because they are the only spillable objects.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());

std::vector<ObjectID> object_ids;
std::vector<std::unique_ptr<RayObject>> objects;
int64_t total_size = 0;
// 20 of these objects are needed to hit the min spilling size, but
// max_fused_object_count=15.
int64_t object_size = 5;

for (size_t i = 0; i < 40; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
auto data_buffer = std::make_shared<MockObjectBuffer>(object_size, object_id, unpins);
total_size += object_size;
auto object = std::make_unique<RayObject>(data_buffer, nullptr,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjectUptoMaxThroughput();
// First two spill batches succeed because they have at least 15 objects.
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
// Last spill batch fails because we have 10 objects and their total size is
// less than 100.
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());

std::vector<std::string> urls;
for (int i = 0; i < 15; i++) {
urls.push_back(BuildURL("url", i));
}
EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(2);
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
for (size_t i = 0; i < urls.size(); i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
}

// We will spill the last objects even though we're under the min spilling
// size because they are the only spillable objects.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
}

} // namespace raylet

} // namespace ray
Expand Down

0 comments on commit 08374e8

Please sign in to comment.