diff --git a/src/mock/ray/core_worker/core_worker.h b/src/mock/ray/core_worker/core_worker.h index 49736faf22b0..01afa150cc46 100644 --- a/src/mock/ray/core_worker/core_worker.h +++ b/src/mock/ray/core_worker/core_worker.h @@ -141,12 +141,6 @@ class MockCoreWorker : public CoreWorker { rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); - MOCK_METHOD(void, - HandleAddSpilledUrl, - (const rpc::AddSpilledUrlRequest &request, - rpc::AddSpilledUrlReply *reply, - rpc::SendReplyCallback send_reply_callback), - (override)); MOCK_METHOD(void, HandleRestoreSpilledObjects, (const rpc::RestoreSpilledObjectsRequest &request, diff --git a/src/mock/ray/rpc/worker/core_worker_client.h b/src/mock/ray/rpc/worker/core_worker_client.h index 7b67aac504a4..a101da1c19e5 100644 --- a/src/mock/ray/rpc/worker/core_worker_client.h +++ b/src/mock/ray/rpc/worker/core_worker_client.h @@ -115,11 +115,6 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn (const DeleteSpilledObjectsRequest &request, const ClientCallback &callback), (override)); - MOCK_METHOD(void, - AddSpilledUrl, - (const AddSpilledUrlRequest &request, - const ClientCallback &callback), - (override)); MOCK_METHOD(void, PlasmaObjectReady, (const PlasmaObjectReadyRequest &request, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 98a2671387e0..fe956edfbc1a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2886,19 +2886,32 @@ void CoreWorker::HandleUpdateObjectLocationBatch( return; } const auto &node_id = NodeID::FromBinary(request.node_id()); - const auto &object_location_states = request.object_location_states(); - - for (const auto &object_location_state : object_location_states) { - const auto &object_id = ObjectID::FromBinary(object_location_state.object_id()); - const auto &state = object_location_state.state(); + const auto &object_location_updates = request.object_location_updates(); + + for (const auto &object_location_update : object_location_updates) { + const auto &object_id = ObjectID::FromBinary(object_location_update.object_id()); + + if (object_location_update.has_spilled_location_update()) { + AddSpilledObjectLocationOwner( + object_id, + object_location_update.spilled_location_update().spilled_url(), + object_location_update.spilled_location_update().spilled_to_local_storage() + ? node_id + : NodeID::Nil()); + } - if (state == rpc::ObjectLocationState::ADDED) { - AddObjectLocationOwner(object_id, node_id); - } else if (state == rpc::ObjectLocationState::REMOVED) { - RemoveObjectLocationOwner(object_id, node_id); - } else { - RAY_LOG(FATAL) << "Invalid object location state " << state - << " has been received."; + if (object_location_update.has_plasma_location_update()) { + if (object_location_update.plasma_location_update() == + rpc::ObjectPlasmaLocationUpdate::ADDED) { + AddObjectLocationOwner(object_id, node_id); + } else if (object_location_update.plasma_location_update() == + rpc::ObjectPlasmaLocationUpdate::REMOVED) { + RemoveObjectLocationOwner(object_id, node_id); + } else { + RAY_LOG(FATAL) << "Invalid object plasma location update " + << object_location_update.plasma_location_update() + << " has been received."; + } } } @@ -2907,6 +2920,19 @@ void CoreWorker::HandleUpdateObjectLocationBatch( /*failure_callback_on_reply*/ nullptr); } +void CoreWorker::AddSpilledObjectLocationOwner(const ObjectID &object_id, + const std::string &spilled_url, + const NodeID &spilled_node_id) { + RAY_LOG(DEBUG) << "Received object spilled location update for object " << object_id + << ", which has been spilled to " << spilled_url << " on node " + << spilled_node_id; + auto reference_exists = + reference_counter_->HandleObjectSpilled(object_id, spilled_url, spilled_node_id); + if (!reference_exists) { + RAY_LOG(DEBUG) << "Object " << object_id << " not found"; + } +} + void CoreWorker::AddObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id) { if (gcs_client_->Nodes().Get(node_id, /*filter_dead_nodes=*/true) == nullptr) { @@ -3158,24 +3184,6 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request, } } -void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request, - rpc::AddSpilledUrlReply *reply, - rpc::SendReplyCallback send_reply_callback) { - const ObjectID object_id = ObjectID::FromBinary(request.object_id()); - const std::string &spilled_url = request.spilled_url(); - const NodeID node_id = NodeID::FromBinary(request.spilled_node_id()); - RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id - << ", which has been spilled to " << spilled_url << " on node " - << node_id; - auto reference_exists = reference_counter_->HandleObjectSpilled( - object_id, spilled_url, node_id, request.size()); - Status status = - reference_exists - ? Status::OK() - : Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); - send_reply_callback(status, nullptr, nullptr); -} - void CoreWorker::HandleRestoreSpilledObjects( const rpc::RestoreSpilledObjectsRequest &request, rpc::RestoreSpilledObjectsReply *reply, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f81ae0e0cf76..e98fb6942936 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -753,11 +753,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) override; - // Add spilled URL to owned reference. - void HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request, - rpc::AddSpilledUrlReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - // Restore objects from external storage. void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request, rpc::RestoreSpilledObjectsReply *reply, @@ -1002,6 +997,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// messages. void ProcessPubsubCommands(const Commands &commands, const NodeID &subscriber_id); + void AddSpilledObjectLocationOwner(const ObjectID &object_id, + const std::string &spilled_url, + const NodeID &spilled_node_id); + void AddObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id); void RemoveObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index cd124e80d613..1ee295b640b1 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -1216,19 +1216,9 @@ absl::optional> ReferenceCounter::GetObjectLocations return it->second.locations; } -size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const { - absl::MutexLock lock(&mutex_); - auto it = object_id_refs_.find(object_id); - if (it == object_id_refs_.end()) { - return 0; - } - return it->second.object_size; -} - bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id, const std::string spilled_url, - const NodeID &spilled_node_id, - int64_t size) { + const NodeID &spilled_node_id) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { @@ -1253,9 +1243,6 @@ bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id, if (!spilled_node_id.IsNil()) { it->second.spilled_node_id = spilled_node_id; } - if (size > 0) { - it->second.object_size = size; - } PushToLocationSubscribers(it); } else { RAY_LOG(DEBUG) << "Object " << object_id << " spilled to dead node " diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index bb4fd70ef305..25f539069e2a 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -431,12 +431,6 @@ class ReferenceCounter : public ReferenceCounterInterface, rpc::WorkerObjectLocationsPubMessage *object_info) LOCKS_EXCLUDED(mutex_); - /// Get an object's size. This will return 0 if the object is out of scope. - /// - /// \param[in] object_id The object whose size to get. - /// \return Object size, or 0 if the object is out of scope. - size_t GetObjectSize(const ObjectID &object_id) const; - /// Handle an object has been spilled to external storage. /// /// This notifies the primary raylet that the object is safe to release and @@ -444,12 +438,10 @@ class ReferenceCounter : public ReferenceCounterInterface, /// \param[in] object_id The object that has been spilled. /// \param[in] spilled_url The URL to which the object has been spilled. /// \param[in] spilled_node_id The ID of the node on which the object was spilled. - /// \param[in] size The size of the object. /// \return True if the reference exists and is in scope, false otherwise. bool HandleObjectSpilled(const ObjectID &object_id, const std::string spilled_url, - const NodeID &spilled_node_id, - int64_t size); + const NodeID &spilled_node_id); /// Get locality data for object. This is used by the leasing policy to implement /// locality-aware leasing. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 3f12fc7aeb2d..968fec57e7bc 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -101,12 +101,12 @@ class CoreWorkerTest : public ::testing::Test { // start raylet on each node. Assign each node with different resources so that // a task can be scheduled to the desired node. for (int i = 0; i < num_nodes; i++) { - raylet_socket_names_[i] = - TestSetupUtil::StartRaylet("127.0.0.1", - node_manager_port + i, - "127.0.0.1:6379", - "\"CPU,4.0,resource" + std::to_string(i) + ",10\"", - &raylet_store_socket_names_[i]); + raylet_socket_names_[i] = TestSetupUtil::StartRaylet( + "127.0.0.1", + node_manager_port + i, + "127.0.0.1:6379", + "\"CPU,4.0,object_store_memory,100,resource" + std::to_string(i) + ",10\"", + &raylet_store_socket_names_[i]); } } diff --git a/src/ray/core_worker/test/reference_count_test.cc b/src/ray/core_worker/test/reference_count_test.cc index 8c36823ae130..fdb11d0d4690 100644 --- a/src/ray/core_worker/test/reference_count_test.cc +++ b/src/ray/core_worker/test/reference_count_test.cc @@ -618,6 +618,31 @@ TEST_F(ReferenceCountTest, TestReferenceStats) { rc->RemoveLocalReference(id2, nullptr); } +TEST_F(ReferenceCountTest, TestHandleObjectSpilled) { + ObjectID obj1 = ObjectID::FromRandom(); + NodeID node1 = NodeID::FromRandom(); + rpc::Address address; + address.set_ip_address("1234"); + + int64_t object_size = 100; + rc->AddOwnedObject(obj1, + {}, + address, + "file1.py:42", + object_size, + false, + /*add_local_ref=*/true, + absl::optional(node1)); + rc->HandleObjectSpilled(obj1, "url1", node1); + rpc::WorkerObjectLocationsPubMessage object_info; + Status status = rc->FillObjectInformation(obj1, &object_info); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(object_info.object_size(), object_size); + ASSERT_EQ(object_info.spilled_url(), "url1"); + ASSERT_EQ(object_info.spilled_node_id(), node1.Binary()); + rc->RemoveLocalReference(obj1, nullptr); +} + // Tests fetching of locality data from reference table. TEST_F(ReferenceCountTest, TestGetLocalityData) { ObjectID obj1 = ObjectID::FromRandom(); diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index a99158393561..25d03980a5fb 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -120,6 +120,20 @@ class IObjectDirectory { const NodeID &node_id, const ObjectInfo &object_info) = 0; + /// Report object spilled to external storage. + /// + /// \param object_id The object id that was spilled. + /// \param node_id The node id corresponding to this node. + /// \param owner_address The address of the owner of this node. + /// \param spilled_url The url of the spilled location. + /// \param spilled_to_local_storage Whether the object is spilled to + /// local storage or cloud storage. + virtual void ReportObjectSpilled(const ObjectID &object_id, + const NodeID &node_id, + const rpc::Address &owner_address, + const std::string &spilled_url, + const bool spilled_to_local_storage) = 0; + /// Record metrics. virtual void RecordMetrics(uint64_t duration_ms) = 0; diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index 129e7d3525e4..7bf539a24a64 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -129,7 +129,13 @@ void OwnershipBasedObjectDirectory::ReportObjectAdded(const ObjectID &object_id, return; } metrics_num_object_locations_added_++; - location_buffers_[worker_id][object_id] = rpc::ObjectLocationState::ADDED; + const bool existing_object = location_buffers_[worker_id].second.contains(object_id); + rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id]; + update.set_object_id(object_id.Binary()); + update.set_plasma_location_update(rpc::ObjectPlasmaLocationUpdate::ADDED); + if (!existing_object) { + location_buffers_[worker_id].first.emplace_back(object_id); + } SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address); } @@ -146,9 +152,45 @@ void OwnershipBasedObjectDirectory::ReportObjectRemoved(const ObjectID &object_i return; } metrics_num_object_locations_removed_++; - location_buffers_[worker_id][object_id] = rpc::ObjectLocationState::REMOVED; + const bool existing_object = location_buffers_[worker_id].second.contains(object_id); + rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id]; + update.set_object_id(object_id.Binary()); + update.set_plasma_location_update(rpc::ObjectPlasmaLocationUpdate::REMOVED); + if (!existing_object) { + location_buffers_[worker_id].first.emplace_back(object_id); + } + SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address); +} + +void OwnershipBasedObjectDirectory::ReportObjectSpilled( + const ObjectID &object_id, + const NodeID &node_id, + const rpc::Address &owner_address, + const std::string &spilled_url, + const bool spilled_to_local_storage) { + RAY_LOG(DEBUG) << "Sending spilled URL " << spilled_url << " for object " << object_id + << " to owner " << WorkerID::FromBinary(owner_address.worker_id()); + + const WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); + auto owner_client = GetClient(owner_address); + if (owner_client == nullptr) { + RAY_LOG(DEBUG) << "Object " << object_id << " does not have owner. " + << "ReportObjectSpilled becomes a no-op. " + << "This should only happen for Plasma store warmup objects."; + return; + } + + const bool existing_object = location_buffers_[worker_id].second.contains(object_id); + rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id]; + update.set_object_id(object_id.Binary()); + update.mutable_spilled_location_update()->set_spilled_url(spilled_url); + update.mutable_spilled_location_update()->set_spilled_to_local_storage( + spilled_to_local_storage); + if (!existing_object) { + location_buffers_[worker_id].first.emplace_back(object_id); + } SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address); -}; +} void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded( const WorkerID &worker_id, const NodeID &node_id, const rpc::Address &owner_address) { @@ -164,28 +206,29 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded( return; } - const auto &object_state_buffers = location_buffer_it->second; - RAY_CHECK(object_state_buffers.size() != 0); + auto &object_queue = location_buffer_it->second.first; + auto &object_map = location_buffer_it->second.second; + RAY_CHECK_EQ(object_queue.size(), object_map.size()); + RAY_CHECK_NE(object_queue.size(), 0u); rpc::UpdateObjectLocationBatchRequest request; request.set_intended_worker_id(worker_id.Binary()); request.set_node_id(node_id.Binary()); - auto object_state_buffers_it = object_state_buffers.begin(); + auto object_queue_it = object_queue.begin(); auto batch_size = 0; - while (object_state_buffers_it != object_state_buffers.end() && + while (object_queue_it != object_queue.end() && batch_size < kMaxObjectReportBatchSize) { - const auto &object_id = object_state_buffers_it->first; - const auto &object_state = object_state_buffers_it->second; - - auto state = request.add_object_location_states(); - state->set_object_id(object_id.Binary()); - state->set_state(object_state); + auto update = request.add_object_location_updates(); + const auto &object_id = *object_queue_it; + *update = std::move(object_map.at(object_id)); + object_map.erase(object_id); batch_size++; - object_state_buffers_it++; + object_queue_it++; } - location_buffer_it->second.erase(object_state_buffers.begin(), object_state_buffers_it); + object_queue.erase(object_queue.begin(), object_queue_it); - if (object_state_buffers.size() == 0) { + RAY_CHECK_EQ(object_queue.size(), object_map.size()); + if (object_queue.size() == 0) { location_buffers_.erase(location_buffer_it); } diff --git a/src/ray/object_manager/ownership_based_object_directory.h b/src/ray/object_manager/ownership_based_object_directory.h index 26ef502c83ab..5ca0bd0f10d3 100644 --- a/src/ray/object_manager/ownership_based_object_directory.h +++ b/src/ray/object_manager/ownership_based_object_directory.h @@ -77,6 +77,12 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { const NodeID &node_id, const ObjectInfo &object_info) override; + void ReportObjectSpilled(const ObjectID &object_id, + const NodeID &node_id, + const rpc::Address &owner_address, + const std::string &spilled_url, + const bool spilled_to_local_storage) override; + void RecordMetrics(uint64_t duration_ms) override; std::string DebugString() const override; @@ -126,7 +132,12 @@ class OwnershipBasedObjectDirectory : public IObjectDirectory { std::function mark_as_failed_; /// A buffer for batch object location updates. - absl::flat_hash_map> + /// owner id -> {(FIFO object queue (to avoid starvation), map for the latest update of + /// objects)}. Since absl::flat_hash_map doesn't maintain the insertion order, we use a + /// deque here to achieve FIFO. + absl::flat_hash_map, + absl::flat_hash_map>> location_buffers_; /// A set of in-flight UpdateObjectLocationBatch requests. diff --git a/src/ray/object_manager/test/ownership_based_object_directory_test.cc b/src/ray/object_manager/test/ownership_based_object_directory_test.cc index c77440f6c71e..0b952380a2b4 100644 --- a/src/ray/object_manager/test/ownership_based_object_directory_test.cc +++ b/src/ray/object_manager/test/ownership_based_object_directory_test.cc @@ -37,13 +37,13 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { const rpc::UpdateObjectLocationBatchRequest &request, const rpc::ClientCallback &callback) override { const auto &worker_id = WorkerID::FromBinary(request.intended_worker_id()); - const auto &object_location_states = request.object_location_states(); + const auto &object_location_updates = request.object_location_updates(); - for (const auto &object_location_state : object_location_states) { - const auto &object_id = ObjectID::FromBinary(object_location_state.object_id()); - const auto &state = object_location_state.state(); + for (const auto &object_location_update : object_location_updates) { + const auto &object_id = ObjectID::FromBinary(object_location_update.object_id()); - buffered_object_locations_[worker_id][object_id] = state; + buffered_object_locations_[worker_id][object_id] = object_location_update; + object_location_updates_.emplace_back(object_location_update); } batch_sent++; callbacks.push_back(callback); @@ -61,27 +61,28 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { return true; } - void AssertObjectIDState(const WorkerID &worker_id, - const ObjectID &object_id, - rpc::ObjectLocationState state) { + void AssertObjectPlasmaLocationUpdate(const WorkerID &worker_id, + const ObjectID &object_id, + rpc::ObjectPlasmaLocationUpdate update) { auto it = buffered_object_locations_.find(worker_id); RAY_CHECK(it != buffered_object_locations_.end()) << "Worker ID " << worker_id << " wasn't updated."; auto object_it = it->second.find(object_id); - RAY_CHECK(object_it->second == state) - << "Object ID " << object_id << "'s state " << object_it->second - << "is unexpected. Expected: " << state; + RAY_CHECK(object_it->second.has_plasma_location_update()); + RAY_CHECK_EQ(object_it->second.plasma_location_update(), update); } void Reset() { buffered_object_locations_.clear(); + object_location_updates_.clear(); callbacks.clear(); callback_invoked = 0; batch_sent = 0; } - absl::flat_hash_map> + absl::flat_hash_map> buffered_object_locations_; + std::vector object_location_updates_; std::deque> callbacks; int callback_invoked = 0; int batch_sent = 0; @@ -145,10 +146,10 @@ class OwnershipBasedObjectDirectoryTest : public ::testing::Test { return info; } - void AssertObjectIDState(const WorkerID &worker_id, - const ObjectID &object_id, - rpc::ObjectLocationState state) { - owner_client->AssertObjectIDState(worker_id, object_id, state); + void AssertObjectPlasmaLocationUpdate(const WorkerID &worker_id, + const ObjectID &object_id, + rpc::ObjectPlasmaLocationUpdate update) { + owner_client->AssertObjectPlasmaLocationUpdate(worker_id, object_id, update); } void AssertNoLeak() { @@ -199,9 +200,9 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBatchBasic) { auto object_info_added = CreateNewObjectInfo(owner_id); obod_.ReportObjectAdded( object_info_added.object_id, current_node_id, object_info_added); - AssertObjectIDState(object_info_added.owner_worker_id, - object_info_added.object_id, - rpc::ObjectLocationState::ADDED); + AssertObjectPlasmaLocationUpdate(object_info_added.owner_worker_id, + object_info_added.object_id, + rpc::ObjectPlasmaLocationUpdate::ADDED); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchRequestSent(), 1); ASSERT_EQ(NumBatchReplied(), 1); @@ -213,14 +214,60 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBatchBasic) { auto object_info_removed = CreateNewObjectInfo(owner_id); obod_.ReportObjectRemoved( object_info_removed.object_id, current_node_id, object_info_removed); - AssertObjectIDState(object_info_removed.owner_worker_id, - object_info_removed.object_id, - rpc::ObjectLocationState::REMOVED); + AssertObjectPlasmaLocationUpdate(object_info_removed.owner_worker_id, + object_info_removed.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchRequestSent(), 2); ASSERT_EQ(NumBatchReplied(), 2); AssertNoLeak(); } + + { + RAY_LOG(INFO) << "Object spilled basic."; + auto object_info_spilled = CreateNewObjectInfo(owner_id); + rpc::Address owner_address; + owner_address.set_worker_id(object_info_spilled.owner_worker_id.Binary()); + obod_.ReportObjectSpilled( + object_info_spilled.object_id, current_node_id, owner_address, "url1", true); + rpc::ObjectLocationUpdate update = + owner_client->buffered_object_locations_.at(object_info_spilled.owner_worker_id) + .at(object_info_spilled.object_id); + ASSERT_EQ(update.spilled_location_update().spilled_url(), "url1"); + ASSERT_EQ(update.spilled_location_update().spilled_to_local_storage(), true); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_EQ(NumBatchRequestSent(), 3); + ASSERT_EQ(NumBatchReplied(), 3); + AssertNoLeak(); + } +} + +TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateFIFOOrder) { + const auto owner_id = WorkerID::FromRandom(); + SendDummyBatch(owner_id); + + auto object_info_1 = CreateNewObjectInfo(owner_id); + auto object_info_2 = CreateNewObjectInfo(owner_id); + auto object_info_3 = CreateNewObjectInfo(owner_id); + obod_.ReportObjectAdded(object_info_1.object_id, current_node_id, object_info_1); + obod_.ReportObjectAdded(object_info_3.object_id, current_node_id, object_info_3); + obod_.ReportObjectAdded(object_info_2.object_id, current_node_id, object_info_2); + obod_.ReportObjectRemoved(object_info_1.object_id, current_node_id, object_info_1); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_EQ(NumBatchReplied(), 1); + ASSERT_EQ(NumBatchRequestSent(), 2); + + ASSERT_EQ(owner_client->object_location_updates_.size(), 4); + ASSERT_EQ(owner_client->object_location_updates_[1].object_id(), + object_info_1.object_id.Binary()); + ASSERT_EQ(owner_client->object_location_updates_[2].object_id(), + object_info_3.object_id.Binary()); + ASSERT_EQ(owner_client->object_location_updates_[3].object_id(), + object_info_2.object_id.Binary()); + + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); + ASSERT_EQ(NumBatchReplied(), 2); + AssertNoLeak(); } TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBufferedUpdate) { @@ -230,14 +277,23 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBufferedUpdate) { auto object_info = CreateNewObjectInfo(owner_id); obod_.ReportObjectAdded(object_info.object_id, current_node_id, object_info); obod_.ReportObjectRemoved(object_info.object_id, current_node_id, object_info); + rpc::Address owner_address; + owner_address.set_worker_id(object_info.owner_worker_id.Binary()); + obod_.ReportObjectSpilled( + object_info.object_id, current_node_id, owner_address, "url1", true); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchReplied(), 1); ASSERT_EQ(NumBatchRequestSent(), 2); // For the same object ID, it should report the latest result (which is REMOVED). - AssertObjectIDState(object_info.owner_worker_id, - object_info.object_id, - rpc::ObjectLocationState::REMOVED); + rpc::ObjectLocationUpdate update = + owner_client->buffered_object_locations_.at(object_info.owner_worker_id) + .at(object_info.object_id); + AssertObjectPlasmaLocationUpdate(object_info.owner_worker_id, + object_info.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); + ASSERT_EQ(update.spilled_location_update().spilled_url(), "url1"); + ASSERT_EQ(update.spilled_location_update().spilled_to_local_storage(), true); ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchReplied(), 2); @@ -262,12 +318,12 @@ TEST_F(OwnershipBasedObjectDirectoryTest, ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchReplied(), 2); // For the same object ID, it should report the latest result (which is REMOVED). - AssertObjectIDState(object_info.owner_worker_id, - object_info.object_id, - rpc::ObjectLocationState::REMOVED); - AssertObjectIDState(object_info_2.owner_worker_id, - object_info_2.object_id, - rpc::ObjectLocationState::ADDED); + AssertObjectPlasmaLocationUpdate(object_info.owner_worker_id, + object_info.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); + AssertObjectPlasmaLocationUpdate(object_info_2.owner_worker_id, + object_info_2.object_id, + rpc::ObjectPlasmaLocationUpdate::ADDED); AssertNoLeak(); } @@ -295,12 +351,12 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateBufferedMultipleOwne ASSERT_EQ(NumBatchRequestSent(), 4); ASSERT_EQ(NumBatchReplied(), 2); // For the same object ID, it should report the latest result (which is REMOVED). - AssertObjectIDState(object_info.owner_worker_id, - object_info.object_id, - rpc::ObjectLocationState::REMOVED); - AssertObjectIDState(object_info_2.owner_worker_id, - object_info_2.object_id, - rpc::ObjectLocationState::ADDED); + AssertObjectPlasmaLocationUpdate(object_info.owner_worker_id, + object_info.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); + AssertObjectPlasmaLocationUpdate(object_info_2.owner_worker_id, + object_info_2.object_id, + rpc::ObjectPlasmaLocationUpdate::ADDED); // Clean up reply and check assert. // owner_1 batch replied @@ -327,9 +383,9 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateOneInFlightRequest) ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(NumBatchRequestSent(), 2); - AssertObjectIDState(object_info.owner_worker_id, - object_info.object_id, - rpc::ObjectLocationState::REMOVED); + AssertObjectPlasmaLocationUpdate(object_info.owner_worker_id, + object_info.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); // After it is replied, if there's no more entry in the buffer, it doesn't send a new // request. @@ -367,9 +423,9 @@ TEST_F(OwnershipBasedObjectDirectoryTest, TestLocationUpdateMaxBatchSize) { // Check if object id states are updated properly. for (const auto &object_info : object_infos) { - AssertObjectIDState(object_info.owner_worker_id, - object_info.object_id, - rpc::ObjectLocationState::REMOVED); + AssertObjectPlasmaLocationUpdate(object_info.owner_worker_id, + object_info.object_id, + rpc::ObjectPlasmaLocationUpdate::REMOVED); } AssertNoLeak(); } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index beaa9894b905..2de632eabf0b 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -186,23 +186,33 @@ message WaitForActorOutOfScopeReply { message UpdateObjectLocationBatchRequest { bytes intended_worker_id = 1; bytes node_id = 2; - repeated ObjectLocationStateUpdate object_location_states = 3; + repeated ObjectLocationUpdate object_location_updates = 3; } message UpdateObjectLocationBatchReply { } -enum ObjectLocationState { - // The latest object state. - // Object is added. +enum ObjectPlasmaLocationUpdate { + // Object is added to plasma store. ADDED = 0; - // Object is removed. + // Object is removed from plasma store. REMOVED = 1; } -message ObjectLocationStateUpdate { +message ObjectSpilledLocationUpdate { + // For objects that have been spilled to external storage, the URL from which + // they can be retrieved. + string spilled_url = 3; + // Whether the object is spilled to local storage or cloud storage. + bool spilled_to_local_storage = 4; +} + +message ObjectLocationUpdate { bytes object_id = 1; - ObjectLocationState state = 2; + // When it's set, it indicates whether the object is added or removed from plasma store. + optional ObjectPlasmaLocationUpdate plasma_location_update = 2; + // When it's set, it contains where the object is spilled to. + optional ObjectSpilledLocationUpdate spilled_location_update = 3; } message GetObjectLocationsOwnerRequest { @@ -308,22 +318,6 @@ message DeleteSpilledObjectsRequest { message DeleteSpilledObjectsReply { } -message AddSpilledUrlRequest { - // Object that was spilled. - bytes object_id = 1; - // For objects that have been spilled to external storage, the URL from which - // they can be retrieved. - string spilled_url = 2; - // The ID of the node that spilled the object. - // This will be Nil if the object was spilled to distributed external storage. - bytes spilled_node_id = 3; - // The size of the object in bytes. - int64 size = 4; -} - -message AddSpilledUrlReply { -} - message ExitRequest { } @@ -392,9 +386,6 @@ service CoreWorkerService { // Delete spilled objects from external storage. Caller: raylet; callee: I/O worker. rpc DeleteSpilledObjects(DeleteSpilledObjectsRequest) returns (DeleteSpilledObjectsReply); - // Add spilled URL, spilled node ID, and update object size for owned object. - // Caller: raylet; callee: owner worker. - rpc AddSpilledUrl(AddSpilledUrlRequest) returns (AddSpilledUrlReply); // Notification from raylet that an object ID is available in local plasma. rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); // Request for a worker to exit. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 6e5bea888b76..d794e06b7bb8 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -359,10 +359,6 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids const ObjectID &object_id = object_ids[i]; const std::string &object_url = worker_reply.spilled_objects_url(i); RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url; - // Choose a node id to report. If an external storage type is not a filesystem, we - // don't need to report where this object is spilled. - const auto node_id_object_spilled = - is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil(); // Update the object_id -> url_ref_count to use it for deletion later. // We need to track the references here because a single file can contain @@ -388,12 +384,6 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids objects_pending_spill_.erase(it); // Asynchronously Update the spilled URL. - rpc::AddSpilledUrlRequest request; - request.set_object_id(object_id.Binary()); - request.set_spilled_url(object_url); - request.set_spilled_node_id(node_id_object_spilled.Binary()); - request.set_size(object_size); - auto freed_it = local_objects_.find(object_id); if (freed_it == local_objects_.end() || freed_it->second.second) { RAY_LOG(DEBUG) << "Spilled object already freed, skipping send of spilled URL to " @@ -402,24 +392,8 @@ void LocalObjectManager::OnObjectSpilled(const std::vector &object_ids continue; } const auto &worker_addr = freed_it->second.first; - auto owner_client = owner_client_pool_.GetOrConnect(worker_addr); - RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id - << " to owner " << WorkerID::FromBinary(worker_addr.worker_id()); - owner_client->AddSpilledUrl( - request, - [object_id, object_url](Status status, const rpc::AddSpilledUrlReply &reply) { - // TODO(sang): Currently we assume there's no network failure. We should handle - // it properly. - if (!status.ok()) { - RAY_LOG(DEBUG) - << "Failed to send spilled url for object " << object_id - << " to object directory, considering the object to have been freed: " - << status.ToString(); - } else { - RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url - << " and object directory has been informed"; - } - }); + object_directory_->ReportObjectSpilled( + object_id, self_node_id_, worker_addr, object_url, is_external_storage_type_fs_); } } @@ -615,8 +589,8 @@ int64_t LocalObjectManager::GetPinnedBytes() const { return pinned_objects_size_; } // Report non-zero usage when there are spilled / spill-pending live objects, to - // prevent this node from being drained. Note that the value reported here is also used - // for scheduling. + // prevent this node from being drained. Note that the value reported here is also + // used for scheduling. return (spilled_objects_url_.empty() && objects_pending_spill_.empty()) ? 0 : 1; } diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 3340653e9d26..76c2df7d44c1 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -22,6 +22,7 @@ #include "ray/common/ray_object.h" #include "ray/gcs/gcs_client/accessor.h" #include "ray/object_manager/common.h" +#include "ray/object_manager/object_directory.h" #include "ray/pubsub/subscriber.h" #include "ray/raylet/worker_pool.h" #include "ray/rpc/worker/core_worker_client_pool.h" @@ -50,7 +51,8 @@ class LocalObjectManager { int64_t max_fused_object_count, std::function &)> on_objects_freed, std::function is_plasma_object_spillable, - pubsub::SubscriberInterface *core_worker_subscriber) + pubsub::SubscriberInterface *core_worker_subscriber, + IObjectDirectory *object_directory) : self_node_id_(node_id), self_node_address_(self_node_address), self_node_port_(self_node_port), @@ -67,7 +69,8 @@ class LocalObjectManager { is_external_storage_type_fs_(is_external_storage_type_fs), max_fused_object_count_(max_fused_object_count), next_spill_error_log_bytes_(RayConfig::instance().verbose_spill_logs()), - core_worker_subscriber_(core_worker_subscriber) {} + core_worker_subscriber_(core_worker_subscriber), + object_directory_(object_directory) {} /// Pin objects. /// @@ -305,6 +308,9 @@ class LocalObjectManager { /// It is used to subscribe objects to evict. pubsub::SubscriberInterface *core_worker_subscriber_; + /// The object directory interface to access object information. + IObjectDirectory *object_directory_; + /// /// Stats /// diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 41af75aa0552..59ab05e1d006 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -319,7 +319,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, [this](const ObjectID &object_id) { return object_manager_.IsPlasmaObjectSpillable(object_id); }, - /*core_worker_subscriber_=*/core_worker_subscriber_.get()), + /*core_worker_subscriber_=*/core_worker_subscriber_.get(), + object_directory_.get()), high_plasma_storage_usage_(RayConfig::instance().high_plasma_storage_usage()), local_gc_run_time_ns_(absl::GetCurrentTimeNanos()), local_gc_throttler_(RayConfig::instance().local_gc_min_interval_s() * 1e9), diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 37ab7ffb149f..76cc2b1f024d 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -25,6 +25,7 @@ #include "ray/rpc/grpc_client.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/rpc/worker/core_worker_client_pool.h" +#include "src/ray/object_manager/ownership_based_object_directory.h" #include "src/ray/protobuf/core_worker.grpc.pb.h" #include "src/ray/protobuf/core_worker.pb.h" @@ -108,26 +109,31 @@ class MockSubscriber : public pubsub::SubscriberInterface { class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: - void AddSpilledUrl( - const rpc::AddSpilledUrlRequest &request, - const rpc::ClientCallback &callback) override { - object_urls.emplace(ObjectID::FromBinary(request.object_id()), request.spilled_url()); - spilled_url_callbacks.push_back(callback); + void UpdateObjectLocationBatch( + const rpc::UpdateObjectLocationBatchRequest &request, + const rpc::ClientCallback &callback) override { + for (const auto &object_location_update : request.object_location_updates()) { + ASSERT_TRUE(object_location_update.has_spilled_location_update()); + object_urls.emplace(ObjectID::FromBinary(object_location_update.object_id()), + object_location_update.spilled_location_update().spilled_url()); + } + update_object_location_batch_callbacks.push_back(callback); } - bool ReplyAddSpilledUrl(Status status = Status::OK()) { - if (spilled_url_callbacks.empty()) { + bool ReplyUpdateObjectLocationBatch(Status status = Status::OK()) { + if (update_object_location_batch_callbacks.empty()) { return false; } - auto callback = spilled_url_callbacks.front(); - auto reply = rpc::AddSpilledUrlReply(); + auto callback = update_object_location_batch_callbacks.front(); + auto reply = rpc::UpdateObjectLocationBatchReply(); callback(status, reply); - spilled_url_callbacks.pop_front(); + update_object_location_batch_callbacks.pop_front(); return true; } absl::flat_hash_map object_urls; - std::deque> spilled_url_callbacks; + std::deque> + update_object_location_batch_callbacks; }; class MockIOWorkerClient : public rpc::CoreWorkerClientInterface { @@ -298,6 +304,14 @@ class LocalObjectManagerTestWithMinSpillingSize { client_pool([&](const rpc::Address &addr) { return owner_client; }), manager_node_id_(NodeID::FromRandom()), max_fused_object_count_(max_fused_object_count), + gcs_client_(), + object_directory_(std::make_unique( + io_service_, + gcs_client_, + subscriber_.get(), + &client_pool, + /*max_object_report_batch_size=*/20000, + [](const ObjectID &object_id, const rpc::ErrorType &error_type) {})), manager( manager_node_id_, "address", @@ -320,7 +334,8 @@ class LocalObjectManagerTestWithMinSpillingSize { [&](const ray::ObjectID &object_id) { return unevictable_objects_.count(object_id) == 0; }, - /*core_worker_subscriber=*/subscriber_.get()), + /*core_worker_subscriber=*/subscriber_.get(), + object_directory_.get()), unpins(std::make_shared>()) { RayConfig::instance().initialize(R"({"object_spilling_config": "dummy"})"); } @@ -351,6 +366,8 @@ class LocalObjectManagerTestWithMinSpillingSize { MockIOWorkerPool worker_pool; NodeID manager_node_id_; size_t max_fused_object_count_; + std::shared_ptr gcs_client_; + std::unique_ptr object_directory_; LocalObjectManager manager; std::unordered_set freed; @@ -428,11 +445,14 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_FALSE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty()); + // The first update is sent out immediately and the remaining ones are batched + // since the first one is still in-flight. + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_FALSE(manager.GetLocalSpilledObjectURL(object_ids[i]).empty()); + ASSERT_EQ(owner_client->object_urls[object_ids[i]], urls[i]); } // Then try restoring objects from local. @@ -494,8 +514,8 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } ASSERT_EQ(num_times_fired, 1); for (size_t i = 0; i < object_ids.size(); i++) { @@ -545,8 +565,8 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { } EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } ASSERT_EQ(num_times_fired, 1); for (size_t i = 0; i < object_ids.size(); i++) { @@ -584,7 +604,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSizeZero) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); const std::string url = BuildURL("url" + std::to_string(object_ids.size())); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks()); } @@ -628,9 +648,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) { // Objects should get freed even though we didn't wait for the owner's notice // to evict. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < urls.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); - } + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(owner_client->object_urls.size(), max_fused_object_count_); for (auto &object_url : owner_client->object_urls) { auto it = std::find(urls.begin(), urls.end(), object_url.second); @@ -704,7 +722,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { std::vector urls; urls.push_back(BuildURL("url" + std::to_string(0))); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[0]})); - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); // Make sure object is spilled. ASSERT_EQ(owner_client->object_urls.size(), 1); for (auto &object_url : owner_client->object_urls) { @@ -729,7 +747,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { } for (size_t i = 1; i < urls.size(); i++) { ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[i]})); - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } ASSERT_EQ(owner_client->object_urls.size(), 3); for (auto &object_url : owner_client->object_urls) { @@ -770,7 +788,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE( worker_pool.io_worker_client->ReplySpillObjects({}, Status::IOError("error"))); - ASSERT_FALSE(owner_client->ReplyAddSpilledUrl()); + ASSERT_FALSE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(num_times_fired, 1); ASSERT_EQ((*unpins)[object_id], 0); @@ -783,7 +801,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { std::string url = BuildURL("url"); EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); ASSERT_EQ(owner_client->object_urls[object_id], url); ASSERT_EQ(num_times_fired, 2); ASSERT_EQ((*unpins)[object_id], 1); @@ -885,8 +903,8 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } // All objects are out of scope now. @@ -938,8 +956,8 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { /*num_objects*/ object_ids_to_spill.size())); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } // Everything is evicted except the last object. In this case, ref count is still > 0. @@ -1011,9 +1029,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { // Spillset 1 objects are spilled. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_1)); - for (size_t i = 0; i < spill_set_1_size; i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); - } + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); // Every object has gone out of scope. for (size_t i = 0; i < spilled_urls_size; i++) { EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); @@ -1028,11 +1044,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { // Now spilling is completely done. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_2)); - for (size_t i = 0; i < spill_set_2_size; i++) { - // These fail because the object is already freed, so the raylet does not - // send the RPC. - ASSERT_FALSE(owner_client->ReplyAddSpilledUrl()); - } + // These fail because the object is already freed, so the raylet does not + // send the RPC. + ASSERT_FALSE(owner_client->ReplyUpdateObjectLocationBatch()); // Every object is now deleted. manager.ProcessSpilledObjectsDeleteQueue(/* max_batch_size */ 30); @@ -1073,8 +1087,8 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } // Every reference has gone out of scope. @@ -1128,8 +1142,6 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - // Imagine a scenario only the first location is updated to the owner. - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[0].Binary())); ASSERT_TRUE(subscriber_->PublishObjectEviction()); // Delete operation is called. In this case, the file with the url should not be @@ -1140,7 +1152,6 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { // Everything else is now deleted. for (size_t i = 1; i < free_objects_batch_size; i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary())); ASSERT_TRUE(subscriber_->PublishObjectEviction()); } @@ -1314,8 +1325,8 @@ TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) { // Objects should get freed even though we didn't wait for the owner's notice // to evict. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < urls.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } ASSERT_EQ(owner_client->object_urls.size(), 2); int num_unpinned = 0; @@ -1369,9 +1380,8 @@ TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSizeMaxFusionCount) { EXPECT_CALL(worker_pool, PushSpillWorker(_)).Times(2); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < urls.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } // We will spill the last objects even though we're under the min spilling @@ -1424,8 +1434,8 @@ TEST_F(LocalObjectManagerTest, TestPinBytes) { urls.push_back(BuildURL("url" + std::to_string(i))); } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); - for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + for (size_t i = 0; i < 2; i++) { + ASSERT_TRUE(owner_client->ReplyUpdateObjectLocationBatch()); } ASSERT_TRUE(spilled); diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index f63e75b1fff3..a2a76d00158f 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -182,9 +182,6 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { const DeleteSpilledObjectsRequest &request, const ClientCallback &callback) {} - virtual void AddSpilledUrl(const AddSpilledUrlRequest &request, - const ClientCallback &callback) {} - virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request, const ClientCallback &callback) { } @@ -309,12 +306,6 @@ class CoreWorkerClient : public std::enable_shared_from_this, /*method_timeout_ms*/ -1, override) - VOID_RPC_CLIENT_METHOD(CoreWorkerService, - AddSpilledUrl, - grpc_client_, - /*method_timeout_ms*/ -1, - override) - VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 2f031397db95..18dc51b876f9 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -44,7 +44,6 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects, -1) \ - RPC_SERVICE_HANDLER(CoreWorkerService, AddSpilledUrl, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, Exit, -1) \ RPC_SERVICE_HANDLER(CoreWorkerService, AssignObjectOwner, -1) @@ -66,7 +65,6 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddSpilledUrl) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignObjectOwner)