Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Spilled Object Leakage] More robust spilled object deletion #29014

Merged
merged 15 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions python/ray/_private/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ def delete_spilled_objects(self, urls: List[str]):
Args:
urls: URLs that store spilled object files.
NOTE: This function should not fail if some of the urls
do not exist.
"""

@abc.abstractmethod
Expand Down Expand Up @@ -331,7 +334,11 @@ def restore_spilled_objects(
def delete_spilled_objects(self, urls: List[str]):
for url in urls:
path = parse_url_with_offset(url.decode()).base_url
os.remove(path)
try:
scv119 marked this conversation as resolved.
Show resolved Hide resolved
os.remove(path)
except FileNotFoundError:
# Occurs when the urls are retried during worker crash/failure.
pass

def destroy_external_storage(self):
for directory_path in self._directory_paths:
Expand Down Expand Up @@ -418,7 +425,11 @@ def restore_spilled_objects(
def delete_spilled_objects(self, urls: List[str]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel the API could be better by returning failure urls (e.g. failure in parsing, failure in delete file etc...)

for url in urls:
path = parse_url_with_offset(url.decode()).base_url
self._fs.delete_file(path)
try:
self._fs.delete_file(path)
except FileNotFoundError:
# Occurs when the urls are retried during worker crash/failure.
pass

def destroy_external_storage(self):
try:
Expand Down
38 changes: 38 additions & 0 deletions python/ray/tests/test_object_spilling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import random
import subprocess
import sys
import tempfile

import numpy as np
import pytest

import ray
from ray._private.test_utils import run_string_as_driver, wait_for_condition
from ray.tests.test_object_spilling import assert_no_thrashing, is_dir_empty
from ray._private.external_storage import (
FileSystemStorage,
ExternalStorageRayStorageImpl,
)


def test_delete_objects(object_spilling_config, shutdown_only):
Expand Down Expand Up @@ -143,6 +148,39 @@ def wait_until_actor_dead():
assert_no_thrashing(address["address"])


@pytest.mark.skipif(platform.system() in ["Windows"], reason="Failing on Windows.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def test_delete_file_non_exists(shutdown_only, tmp_path):
ray.init(storage=str(tmp_path))

def create_spilled_files(num_files):
spilled_files = []
uris = []
for _ in range(3):
fd, path = tempfile.mkstemp()
with os.fdopen(fd, "w") as tmp:
tmp.write("stuff")
spilled_files.append(path)
uris.append((path + "?offset=0&size=10").encode("ascii"))
return spilled_files, uris

for storage in [
ExternalStorageRayStorageImpl("session"),
FileSystemStorage("/tmp"),
]:
spilled_files, uris = create_spilled_files(3)
storage.delete_spilled_objects(uris)
for file in spilled_files:
assert not os.path.exists(file)

# delete should succeed even if some files doesn't exist.
spilled_files1, uris1 = create_spilled_files(3)
spilled_files += spilled_files1
uris += uris1
storage.delete_spilled_objects(uris)
for file in spilled_files:
assert not os.path.exists(file)


@pytest.mark.skipif(
platform.system() in ["Windows"], reason="Failing on Windows and MacOS."
)
Expand Down
26 changes: 20 additions & 6 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,13 +556,14 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
spilled_object_pending_delete_.pop();
}
if (object_urls_to_delete.size() > 0) {
DeleteSpilledObjects(object_urls_to_delete);
DeleteSpilledObjects(std::move(object_urls_to_delete));
}
}

void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> &urls_to_delete) {
void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> urls_to_delete,
int64_t num_retries) {
io_worker_pool_.PopDeleteWorker(
[this, urls_to_delete](std::shared_ptr<WorkerInterface> io_worker) {
[this, urls_to_delete, num_retries](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending delete spilled object request. Length: "
<< urls_to_delete.size();
rpc::DeleteSpilledObjectsRequest request;
Expand All @@ -571,12 +572,22 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> &urls_to_
}
io_worker->rpc_client()->DeleteSpilledObjects(
request,
[this, io_worker](const ray::Status &status,
const rpc::DeleteSpilledObjectsReply &reply) {
[this, urls_to_delete = std::move(urls_to_delete), num_retries, io_worker](
const ray::Status &status, const rpc::DeleteSpilledObjectsReply &reply) {
io_worker_pool_.PushDeleteWorker(io_worker);
if (!status.ok()) {
num_failed_deletion_requests_ += 1;
RAY_LOG(ERROR) << "Failed to send delete spilled object request: "
<< status.ToString();
<< status.ToString() << ", retry count: " << num_retries;

if (num_retries > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need back-off for retry?

// retry failed requests.
io_service_.post(
[this, urls_to_delete = std::move(urls_to_delete), num_retries]() {
DeleteSpilledObjects(urls_to_delete, num_retries - 1);
},
"LocaObjectManager.RetryDeleteSpilledObjects");
}
}
});
});
Expand Down Expand Up @@ -625,6 +636,9 @@ void LocalObjectManager::RecordMetrics() const {
ray::stats::STATS_object_store_memory.Record(
spilled_bytes_current_,
{{ray::stats::LocationKey.name(), ray::stats::kObjectLocSpilled}});

ray::stats::STATS_spill_manager_request_total.Record(num_failed_deletion_requests_,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm Wonder if we need more breakdown;;

type: spill/restore/delete
status: success/failed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could work on this as part of 2.2.

"FailedDeletion");
}

int64_t LocalObjectManager::GetPrimaryBytes() const {
Expand Down
17 changes: 16 additions & 1 deletion src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace ray {

namespace raylet {

/// The default number of retries when spilled object deletion failed.
const int64_t kDefaultSpilledObjectDeleteRetries = 3;

/// This class implements memory management for primary objects, objects that
/// have been freed, and objects that have been spilled.
class LocalObjectManager {
Expand All @@ -41,6 +44,7 @@ class LocalObjectManager {
const NodeID &node_id,
std::string self_node_address,
int self_node_port,
instrumented_io_context &io_service,
size_t free_objects_batch_size,
int64_t free_objects_period_ms,
IOWorkerPoolInterface &io_worker_pool,
Expand All @@ -56,6 +60,7 @@ class LocalObjectManager {
: self_node_id_(node_id),
self_node_address_(self_node_address),
self_node_port_(self_node_port),
io_service_(io_service),
free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
Expand Down Expand Up @@ -186,6 +191,7 @@ class LocalObjectManager {
FRIEND_TEST(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectNotEvictable);
FRIEND_TEST(LocalObjectManagerTest, TestRetryDeleteSpilledObjects);

/// Asynchronously spill objects when space is needed. The callback tries to
/// spill at least num_bytes_to_spill and returns true if we found objects to
Expand Down Expand Up @@ -217,12 +223,18 @@ class LocalObjectManager {
/// Delete spilled objects stored in given urls.
///
/// \param urls_to_delete List of urls to delete from external storages.
void DeleteSpilledObjects(std::vector<std::string> &urls_to_delete);
/// \param num_retries Num of retries allowed in case of failure, zero or negative
/// means don't retry.
void DeleteSpilledObjects(std::vector<std::string> urls_to_delete,
int64_t num_retries = kDefaultSpilledObjectDeleteRetries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_retries_left?


const NodeID self_node_id_;
const std::string self_node_address_;
const int self_node_port_;

/// The io_service/thread this class runs in.
instrumented_io_context &io_service_;

/// The period between attempts to eagerly evict objects from plasma.
const int64_t free_objects_period_ms_;

Expand Down Expand Up @@ -372,6 +384,9 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;

/// The number of failed deletion requests.
std::atomic<int64_t> num_failed_deletion_requests_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I think this doesn't have to be atomic (it is single threaded)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i thought the io_worker->rpc_client()->DeleteSpilledObjects callback happens in a different thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should start document explicitly where callbacks are run w.r.t threading model lol


friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
self_node_id_,
config.node_manager_address,
config.node_manager_port,
io_service_,
RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(),
worker_pool_,
Expand Down
30 changes: 30 additions & 0 deletions src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface {
return deleted_urls_size;
}

int FailDeleteSpilledObject(Status status = Status::IOError("io error")) {
if (delete_callbacks.size() == 0) {
return 0;
}

auto callback = delete_callbacks.front();
auto reply = rpc::DeleteSpilledObjectsReply();
callback(status, reply);

auto &request = delete_requests.front();
int deleted_urls_size = request.spilled_objects_url_size();
delete_callbacks.pop_front();
delete_requests.pop_front();

return deleted_urls_size;
}

std::list<rpc::ClientCallback<rpc::SpillObjectsReply>> callbacks;
std::list<rpc::ClientCallback<rpc::DeleteSpilledObjectsReply>> delete_callbacks;
std::list<rpc::ClientCallback<rpc::RestoreSpilledObjectsReply>> restore_callbacks;
Expand Down Expand Up @@ -316,6 +333,7 @@ class LocalObjectManagerTestWithMinSpillingSize {
manager_node_id_,
"address",
1234,
io_service_,
free_objects_batch_size,
/*free_objects_period_ms=*/1000,
worker_pool,
Expand Down Expand Up @@ -1381,6 +1399,18 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
AssertNoLeaks();
}

TEST_F(LocalObjectManagerTest, TestRetryDeleteSpilledObjects) {
std::vector<std::string> urls_to_delete{{"url1"}};
manager.DeleteSpilledObjects(urls_to_delete, /*num_retries*/ 1);
ASSERT_EQ(1, worker_pool.io_worker_client->FailDeleteSpilledObject());
io_service_.run_one();
// assert the request is retried.
ASSERT_EQ(1, worker_pool.io_worker_client->FailDeleteSpilledObject());
// retry exhaused.
io_service_.run_one();
ASSERT_EQ(0, worker_pool.io_worker_client->FailDeleteSpilledObject());
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
Expand Down