Skip to content

Commit

Permalink
Unify AddSpilledUrl into UpdateObjectLocationBatch RPC (#23872)
Browse files Browse the repository at this point in the history
- Logically these two rpcs are about notifying the owner about the object location changes, so we should just have one rpc for that purpose. This prevents out-of-order updates seen by the owner (i.e. receiving object removed from object store before spill update). Also by using UpdateObjectLocationBatch, we get batch update for free.
- Maintain a FIFO order for object location updates so we won't have starvation.
  • Loading branch information
jjyao authored Apr 18, 2022
1 parent cb02e2f commit 5d7f45f
Show file tree
Hide file tree
Showing 19 changed files with 351 additions and 256 deletions.
6 changes: 0 additions & 6 deletions src/mock/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,6 @@ class MockCoreWorker : public CoreWorker {
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleAddSpilledUrl,
(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleRestoreSpilledObjects,
(const rpc::RestoreSpilledObjectsRequest &request,
Expand Down
5 changes: 0 additions & 5 deletions src/mock/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn
(const DeleteSpilledObjectsRequest &request,
const ClientCallback<DeleteSpilledObjectsReply> &callback),
(override));
MOCK_METHOD(void,
AddSpilledUrl,
(const AddSpilledUrlRequest &request,
const ClientCallback<AddSpilledUrlReply> &callback),
(override));
MOCK_METHOD(void,
PlasmaObjectReady,
(const PlasmaObjectReadyRequest &request,
Expand Down
68 changes: 38 additions & 30 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2886,19 +2886,32 @@ void CoreWorker::HandleUpdateObjectLocationBatch(
return;
}
const auto &node_id = NodeID::FromBinary(request.node_id());
const auto &object_location_states = request.object_location_states();

for (const auto &object_location_state : object_location_states) {
const auto &object_id = ObjectID::FromBinary(object_location_state.object_id());
const auto &state = object_location_state.state();
const auto &object_location_updates = request.object_location_updates();

for (const auto &object_location_update : object_location_updates) {
const auto &object_id = ObjectID::FromBinary(object_location_update.object_id());

if (object_location_update.has_spilled_location_update()) {
AddSpilledObjectLocationOwner(
object_id,
object_location_update.spilled_location_update().spilled_url(),
object_location_update.spilled_location_update().spilled_to_local_storage()
? node_id
: NodeID::Nil());
}

if (state == rpc::ObjectLocationState::ADDED) {
AddObjectLocationOwner(object_id, node_id);
} else if (state == rpc::ObjectLocationState::REMOVED) {
RemoveObjectLocationOwner(object_id, node_id);
} else {
RAY_LOG(FATAL) << "Invalid object location state " << state
<< " has been received.";
if (object_location_update.has_plasma_location_update()) {
if (object_location_update.plasma_location_update() ==
rpc::ObjectPlasmaLocationUpdate::ADDED) {
AddObjectLocationOwner(object_id, node_id);
} else if (object_location_update.plasma_location_update() ==
rpc::ObjectPlasmaLocationUpdate::REMOVED) {
RemoveObjectLocationOwner(object_id, node_id);
} else {
RAY_LOG(FATAL) << "Invalid object plasma location update "
<< object_location_update.plasma_location_update()
<< " has been received.";
}
}
}

Expand All @@ -2907,6 +2920,19 @@ void CoreWorker::HandleUpdateObjectLocationBatch(
/*failure_callback_on_reply*/ nullptr);
}

void CoreWorker::AddSpilledObjectLocationOwner(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id) {
RAY_LOG(DEBUG) << "Received object spilled location update for object " << object_id
<< ", which has been spilled to " << spilled_url << " on node "
<< spilled_node_id;
auto reference_exists =
reference_counter_->HandleObjectSpilled(object_id, spilled_url, spilled_node_id);
if (!reference_exists) {
RAY_LOG(DEBUG) << "Object " << object_id << " not found";
}
}

void CoreWorker::AddObjectLocationOwner(const ObjectID &object_id,
const NodeID &node_id) {
if (gcs_client_->Nodes().Get(node_id, /*filter_dead_nodes=*/true) == nullptr) {
Expand Down Expand Up @@ -3158,24 +3184,6 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
}
}

void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback) {
const ObjectID object_id = ObjectID::FromBinary(request.object_id());
const std::string &spilled_url = request.spilled_url();
const NodeID node_id = NodeID::FromBinary(request.spilled_node_id());
RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id
<< ", which has been spilled to " << spilled_url << " on node "
<< node_id;
auto reference_exists = reference_counter_->HandleObjectSpilled(
object_id, spilled_url, node_id, request.size());
Status status =
reference_exists
? Status::OK()
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
send_reply_callback(status, nullptr, nullptr);
}

void CoreWorker::HandleRestoreSpilledObjects(
const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply,
Expand Down
9 changes: 4 additions & 5 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,11 +753,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::SpillObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

// Add spilled URL to owned reference.
void HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
rpc::AddSpilledUrlReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

// Restore objects from external storage.
void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request,
rpc::RestoreSpilledObjectsReply *reply,
Expand Down Expand Up @@ -1002,6 +997,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// messages.
void ProcessPubsubCommands(const Commands &commands, const NodeID &subscriber_id);

void AddSpilledObjectLocationOwner(const ObjectID &object_id,
const std::string &spilled_url,
const NodeID &spilled_node_id);

void AddObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id);

void RemoveObjectLocationOwner(const ObjectID &object_id, const NodeID &node_id);
Expand Down
15 changes: 1 addition & 14 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1216,19 +1216,9 @@ absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations
return it->second.locations;
}

size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return 0;
}
return it->second.object_size;
}

bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id,
const std::string spilled_url,
const NodeID &spilled_node_id,
int64_t size) {
const NodeID &spilled_node_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
Expand All @@ -1253,9 +1243,6 @@ bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id,
if (!spilled_node_id.IsNil()) {
it->second.spilled_node_id = spilled_node_id;
}
if (size > 0) {
it->second.object_size = size;
}
PushToLocationSubscribers(it);
} else {
RAY_LOG(DEBUG) << "Object " << object_id << " spilled to dead node "
Expand Down
10 changes: 1 addition & 9 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,25 +431,17 @@ class ReferenceCounter : public ReferenceCounterInterface,
rpc::WorkerObjectLocationsPubMessage *object_info)
LOCKS_EXCLUDED(mutex_);

/// Get an object's size. This will return 0 if the object is out of scope.
///
/// \param[in] object_id The object whose size to get.
/// \return Object size, or 0 if the object is out of scope.
size_t GetObjectSize(const ObjectID &object_id) const;

/// Handle an object has been spilled to external storage.
///
/// This notifies the primary raylet that the object is safe to release and
/// records the spill URL, spill node ID, and updated object size.
/// \param[in] object_id The object that has been spilled.
/// \param[in] spilled_url The URL to which the object has been spilled.
/// \param[in] spilled_node_id The ID of the node on which the object was spilled.
/// \param[in] size The size of the object.
/// \return True if the reference exists and is in scope, false otherwise.
bool HandleObjectSpilled(const ObjectID &object_id,
const std::string spilled_url,
const NodeID &spilled_node_id,
int64_t size);
const NodeID &spilled_node_id);

/// Get locality data for object. This is used by the leasing policy to implement
/// locality-aware leasing.
Expand Down
12 changes: 6 additions & 6 deletions src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ class CoreWorkerTest : public ::testing::Test {
// start raylet on each node. Assign each node with different resources so that
// a task can be scheduled to the desired node.
for (int i = 0; i < num_nodes; i++) {
raylet_socket_names_[i] =
TestSetupUtil::StartRaylet("127.0.0.1",
node_manager_port + i,
"127.0.0.1:6379",
"\"CPU,4.0,resource" + std::to_string(i) + ",10\"",
&raylet_store_socket_names_[i]);
raylet_socket_names_[i] = TestSetupUtil::StartRaylet(
"127.0.0.1",
node_manager_port + i,
"127.0.0.1:6379",
"\"CPU,4.0,object_store_memory,100,resource" + std::to_string(i) + ",10\"",
&raylet_store_socket_names_[i]);
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/ray/core_worker/test/reference_count_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,31 @@ TEST_F(ReferenceCountTest, TestReferenceStats) {
rc->RemoveLocalReference(id2, nullptr);
}

TEST_F(ReferenceCountTest, TestHandleObjectSpilled) {
ObjectID obj1 = ObjectID::FromRandom();
NodeID node1 = NodeID::FromRandom();
rpc::Address address;
address.set_ip_address("1234");

int64_t object_size = 100;
rc->AddOwnedObject(obj1,
{},
address,
"file1.py:42",
object_size,
false,
/*add_local_ref=*/true,
absl::optional<NodeID>(node1));
rc->HandleObjectSpilled(obj1, "url1", node1);
rpc::WorkerObjectLocationsPubMessage object_info;
Status status = rc->FillObjectInformation(obj1, &object_info);
ASSERT_TRUE(status.ok());
ASSERT_EQ(object_info.object_size(), object_size);
ASSERT_EQ(object_info.spilled_url(), "url1");
ASSERT_EQ(object_info.spilled_node_id(), node1.Binary());
rc->RemoveLocalReference(obj1, nullptr);
}

// Tests fetching of locality data from reference table.
TEST_F(ReferenceCountTest, TestGetLocalityData) {
ObjectID obj1 = ObjectID::FromRandom();
Expand Down
14 changes: 14 additions & 0 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ class IObjectDirectory {
const NodeID &node_id,
const ObjectInfo &object_info) = 0;

/// Report object spilled to external storage.
///
/// \param object_id The object id that was spilled.
/// \param node_id The node id corresponding to this node.
/// \param owner_address The address of the owner of this node.
/// \param spilled_url The url of the spilled location.
/// \param spilled_to_local_storage Whether the object is spilled to
/// local storage or cloud storage.
virtual void ReportObjectSpilled(const ObjectID &object_id,
const NodeID &node_id,
const rpc::Address &owner_address,
const std::string &spilled_url,
const bool spilled_to_local_storage) = 0;

/// Record metrics.
virtual void RecordMetrics(uint64_t duration_ms) = 0;

Expand Down
75 changes: 59 additions & 16 deletions src/ray/object_manager/ownership_based_object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ void OwnershipBasedObjectDirectory::ReportObjectAdded(const ObjectID &object_id,
return;
}
metrics_num_object_locations_added_++;
location_buffers_[worker_id][object_id] = rpc::ObjectLocationState::ADDED;
const bool existing_object = location_buffers_[worker_id].second.contains(object_id);
rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id];
update.set_object_id(object_id.Binary());
update.set_plasma_location_update(rpc::ObjectPlasmaLocationUpdate::ADDED);
if (!existing_object) {
location_buffers_[worker_id].first.emplace_back(object_id);
}
SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address);
}

Expand All @@ -146,9 +152,45 @@ void OwnershipBasedObjectDirectory::ReportObjectRemoved(const ObjectID &object_i
return;
}
metrics_num_object_locations_removed_++;
location_buffers_[worker_id][object_id] = rpc::ObjectLocationState::REMOVED;
const bool existing_object = location_buffers_[worker_id].second.contains(object_id);
rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id];
update.set_object_id(object_id.Binary());
update.set_plasma_location_update(rpc::ObjectPlasmaLocationUpdate::REMOVED);
if (!existing_object) {
location_buffers_[worker_id].first.emplace_back(object_id);
}
SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address);
}

void OwnershipBasedObjectDirectory::ReportObjectSpilled(
const ObjectID &object_id,
const NodeID &node_id,
const rpc::Address &owner_address,
const std::string &spilled_url,
const bool spilled_to_local_storage) {
RAY_LOG(DEBUG) << "Sending spilled URL " << spilled_url << " for object " << object_id
<< " to owner " << WorkerID::FromBinary(owner_address.worker_id());

const WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id());
auto owner_client = GetClient(owner_address);
if (owner_client == nullptr) {
RAY_LOG(DEBUG) << "Object " << object_id << " does not have owner. "
<< "ReportObjectSpilled becomes a no-op. "
<< "This should only happen for Plasma store warmup objects.";
return;
}

const bool existing_object = location_buffers_[worker_id].second.contains(object_id);
rpc::ObjectLocationUpdate &update = location_buffers_[worker_id].second[object_id];
update.set_object_id(object_id.Binary());
update.mutable_spilled_location_update()->set_spilled_url(spilled_url);
update.mutable_spilled_location_update()->set_spilled_to_local_storage(
spilled_to_local_storage);
if (!existing_object) {
location_buffers_[worker_id].first.emplace_back(object_id);
}
SendObjectLocationUpdateBatchIfNeeded(worker_id, node_id, owner_address);
};
}

void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded(
const WorkerID &worker_id, const NodeID &node_id, const rpc::Address &owner_address) {
Expand All @@ -164,28 +206,29 @@ void OwnershipBasedObjectDirectory::SendObjectLocationUpdateBatchIfNeeded(
return;
}

const auto &object_state_buffers = location_buffer_it->second;
RAY_CHECK(object_state_buffers.size() != 0);
auto &object_queue = location_buffer_it->second.first;
auto &object_map = location_buffer_it->second.second;
RAY_CHECK_EQ(object_queue.size(), object_map.size());
RAY_CHECK_NE(object_queue.size(), 0u);

rpc::UpdateObjectLocationBatchRequest request;
request.set_intended_worker_id(worker_id.Binary());
request.set_node_id(node_id.Binary());
auto object_state_buffers_it = object_state_buffers.begin();
auto object_queue_it = object_queue.begin();
auto batch_size = 0;
while (object_state_buffers_it != object_state_buffers.end() &&
while (object_queue_it != object_queue.end() &&
batch_size < kMaxObjectReportBatchSize) {
const auto &object_id = object_state_buffers_it->first;
const auto &object_state = object_state_buffers_it->second;

auto state = request.add_object_location_states();
state->set_object_id(object_id.Binary());
state->set_state(object_state);
auto update = request.add_object_location_updates();
const auto &object_id = *object_queue_it;
*update = std::move(object_map.at(object_id));
object_map.erase(object_id);
batch_size++;
object_state_buffers_it++;
object_queue_it++;
}
location_buffer_it->second.erase(object_state_buffers.begin(), object_state_buffers_it);
object_queue.erase(object_queue.begin(), object_queue_it);

if (object_state_buffers.size() == 0) {
RAY_CHECK_EQ(object_queue.size(), object_map.size());
if (object_queue.size() == 0) {
location_buffers_.erase(location_buffer_it);
}

Expand Down
Loading

0 comments on commit 5d7f45f

Please sign in to comment.