Skip to content

Commit

Permalink
[Core][Spilled Object Leakage] More robust spilled object deletion (#…
Browse files Browse the repository at this point in the history
…29014)

We have noticed spilled objects not deleted even if the job creating those objects finished execution. After reading the code my theory is that the object delegation worker failed in the middle of deleting spilled files, which doesn't handle well in today's spilled object deletion logic.

Though I no longer get a reproduction, (which I suspect due to the fix #26395), we can enhance the failure handle logic when object deletion failed.
  • Loading branch information
scv119 authored Oct 7, 2022
1 parent 3c58caa commit c40632e
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 9 deletions.
15 changes: 13 additions & 2 deletions python/ray/_private/external_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ def delete_spilled_objects(self, urls: List[str]):
Args:
urls: URLs that store spilled object files.
NOTE: This function should not fail if some of the urls
do not exist.
"""

@abc.abstractmethod
Expand Down Expand Up @@ -331,7 +334,11 @@ def restore_spilled_objects(
def delete_spilled_objects(self, urls: List[str]):
for url in urls:
path = parse_url_with_offset(url.decode()).base_url
os.remove(path)
try:
os.remove(path)
except FileNotFoundError:
# Occurs when the urls are retried during worker crash/failure.
pass

def destroy_external_storage(self):
for directory_path in self._directory_paths:
Expand Down Expand Up @@ -418,7 +425,11 @@ def restore_spilled_objects(
def delete_spilled_objects(self, urls: List[str]):
for url in urls:
path = parse_url_with_offset(url.decode()).base_url
self._fs.delete_file(path)
try:
self._fs.delete_file(path)
except FileNotFoundError:
# Occurs when the urls are retried during worker crash/failure.
pass

def destroy_external_storage(self):
try:
Expand Down
38 changes: 38 additions & 0 deletions python/ray/tests/test_object_spilling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import random
import subprocess
import sys
import tempfile

import numpy as np
import pytest

import ray
from ray._private.test_utils import run_string_as_driver, wait_for_condition
from ray.tests.test_object_spilling import assert_no_thrashing, is_dir_empty
from ray._private.external_storage import (
FileSystemStorage,
ExternalStorageRayStorageImpl,
)


def test_delete_objects(object_spilling_config, shutdown_only):
Expand Down Expand Up @@ -143,6 +148,39 @@ def wait_until_actor_dead():
assert_no_thrashing(address["address"])


@pytest.mark.skipif(platform.system() in ["Windows"], reason="Failing on Windows.")
def test_delete_file_non_exists(shutdown_only, tmp_path):
ray.init(storage=str(tmp_path))

def create_spilled_files(num_files):
spilled_files = []
uris = []
for _ in range(3):
fd, path = tempfile.mkstemp()
with os.fdopen(fd, "w") as tmp:
tmp.write("stuff")
spilled_files.append(path)
uris.append((path + "?offset=0&size=10").encode("ascii"))
return spilled_files, uris

for storage in [
ExternalStorageRayStorageImpl("session"),
FileSystemStorage("/tmp"),
]:
spilled_files, uris = create_spilled_files(3)
storage.delete_spilled_objects(uris)
for file in spilled_files:
assert not os.path.exists(file)

# delete should succeed even if some files doesn't exist.
spilled_files1, uris1 = create_spilled_files(3)
spilled_files += spilled_files1
uris += uris1
storage.delete_spilled_objects(uris)
for file in spilled_files:
assert not os.path.exists(file)


@pytest.mark.skipif(
platform.system() in ["Windows"], reason="Failing on Windows and MacOS."
)
Expand Down
26 changes: 20 additions & 6 deletions src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,13 +556,14 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
spilled_object_pending_delete_.pop();
}
if (object_urls_to_delete.size() > 0) {
DeleteSpilledObjects(object_urls_to_delete);
DeleteSpilledObjects(std::move(object_urls_to_delete));
}
}

void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> &urls_to_delete) {
void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> urls_to_delete,
int64_t num_retries) {
io_worker_pool_.PopDeleteWorker(
[this, urls_to_delete](std::shared_ptr<WorkerInterface> io_worker) {
[this, urls_to_delete, num_retries](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending delete spilled object request. Length: "
<< urls_to_delete.size();
rpc::DeleteSpilledObjectsRequest request;
Expand All @@ -571,12 +572,22 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> &urls_to_
}
io_worker->rpc_client()->DeleteSpilledObjects(
request,
[this, io_worker](const ray::Status &status,
const rpc::DeleteSpilledObjectsReply &reply) {
[this, urls_to_delete = std::move(urls_to_delete), num_retries, io_worker](
const ray::Status &status, const rpc::DeleteSpilledObjectsReply &reply) {
io_worker_pool_.PushDeleteWorker(io_worker);
if (!status.ok()) {
num_failed_deletion_requests_ += 1;
RAY_LOG(ERROR) << "Failed to send delete spilled object request: "
<< status.ToString();
<< status.ToString() << ", retry count: " << num_retries;

if (num_retries > 0) {
// retry failed requests.
io_service_.post(
[this, urls_to_delete = std::move(urls_to_delete), num_retries]() {
DeleteSpilledObjects(urls_to_delete, num_retries - 1);
},
"LocaObjectManager.RetryDeleteSpilledObjects");
}
}
});
});
Expand Down Expand Up @@ -625,6 +636,9 @@ void LocalObjectManager::RecordMetrics() const {
ray::stats::STATS_object_store_memory.Record(
spilled_bytes_current_,
{{ray::stats::LocationKey.name(), ray::stats::kObjectLocSpilled}});

ray::stats::STATS_spill_manager_request_total.Record(num_failed_deletion_requests_,
"FailedDeletion");
}

int64_t LocalObjectManager::GetPrimaryBytes() const {
Expand Down
17 changes: 16 additions & 1 deletion src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace ray {

namespace raylet {

/// The default number of retries when spilled object deletion failed.
const int64_t kDefaultSpilledObjectDeleteRetries = 3;

/// This class implements memory management for primary objects, objects that
/// have been freed, and objects that have been spilled.
class LocalObjectManager {
Expand All @@ -41,6 +44,7 @@ class LocalObjectManager {
const NodeID &node_id,
std::string self_node_address,
int self_node_port,
instrumented_io_context &io_service,
size_t free_objects_batch_size,
int64_t free_objects_period_ms,
IOWorkerPoolInterface &io_worker_pool,
Expand All @@ -56,6 +60,7 @@ class LocalObjectManager {
: self_node_id_(node_id),
self_node_address_(self_node_address),
self_node_port_(self_node_port),
io_service_(io_service),
free_objects_period_ms_(free_objects_period_ms),
free_objects_batch_size_(free_objects_batch_size),
io_worker_pool_(io_worker_pool),
Expand Down Expand Up @@ -186,6 +191,7 @@ class LocalObjectManager {
FRIEND_TEST(LocalObjectManagerTest,
TestSpillObjectsOfSizeNumBytesToSpillHigherThanMinBytesToSpill);
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectNotEvictable);
FRIEND_TEST(LocalObjectManagerTest, TestRetryDeleteSpilledObjects);

/// Asynchronously spill objects when space is needed. The callback tries to
/// spill at least num_bytes_to_spill and returns true if we found objects to
Expand Down Expand Up @@ -217,12 +223,18 @@ class LocalObjectManager {
/// Delete spilled objects stored in given urls.
///
/// \param urls_to_delete List of urls to delete from external storages.
void DeleteSpilledObjects(std::vector<std::string> &urls_to_delete);
/// \param num_retries Num of retries allowed in case of failure, zero or negative
/// means don't retry.
void DeleteSpilledObjects(std::vector<std::string> urls_to_delete,
int64_t num_retries = kDefaultSpilledObjectDeleteRetries);

const NodeID self_node_id_;
const std::string self_node_address_;
const int self_node_port_;

/// The io_service/thread this class runs in.
instrumented_io_context &io_service_;

/// The period between attempts to eagerly evict objects from plasma.
const int64_t free_objects_period_ms_;

Expand Down Expand Up @@ -372,6 +384,9 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;

/// The number of failed deletion requests.
std::atomic<int64_t> num_failed_deletion_requests_ = 0;

friend class LocalObjectManagerTestWithMinSpillingSize;
friend class LocalObjectManagerTest;
friend class LocalObjectManagerFusedTest;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
self_node_id_,
config.node_manager_address,
config.node_manager_port,
io_service_,
RayConfig::instance().free_objects_batch_size(),
RayConfig::instance().free_objects_period_milliseconds(),
worker_pool_,
Expand Down
30 changes: 30 additions & 0 deletions src/ray/raylet/test/local_object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ class MockIOWorkerClient : public rpc::CoreWorkerClientInterface {
return deleted_urls_size;
}

int FailDeleteSpilledObject(Status status = Status::IOError("io error")) {
if (delete_callbacks.size() == 0) {
return 0;
}

auto callback = delete_callbacks.front();
auto reply = rpc::DeleteSpilledObjectsReply();
callback(status, reply);

auto &request = delete_requests.front();
int deleted_urls_size = request.spilled_objects_url_size();
delete_callbacks.pop_front();
delete_requests.pop_front();

return deleted_urls_size;
}

std::list<rpc::ClientCallback<rpc::SpillObjectsReply>> callbacks;
std::list<rpc::ClientCallback<rpc::DeleteSpilledObjectsReply>> delete_callbacks;
std::list<rpc::ClientCallback<rpc::RestoreSpilledObjectsReply>> restore_callbacks;
Expand Down Expand Up @@ -316,6 +333,7 @@ class LocalObjectManagerTestWithMinSpillingSize {
manager_node_id_,
"address",
1234,
io_service_,
free_objects_batch_size,
/*free_objects_period_ms=*/1000,
worker_pool,
Expand Down Expand Up @@ -1381,6 +1399,18 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
AssertNoLeaks();
}

TEST_F(LocalObjectManagerTest, TestRetryDeleteSpilledObjects) {
std::vector<std::string> urls_to_delete{{"url1"}};
manager.DeleteSpilledObjects(urls_to_delete, /*num_retries*/ 1);
ASSERT_EQ(1, worker_pool.io_worker_client->FailDeleteSpilledObject());
io_service_.run_one();
// assert the request is retried.
ASSERT_EQ(1, worker_pool.io_worker_client->FailDeleteSpilledObject());
// retry exhaused.
io_service_.run_one();
ASSERT_EQ(0, worker_pool.io_worker_client->FailDeleteSpilledObject());
}

TEST_F(LocalObjectManagerFusedTest, TestMinSpillingSize) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
Expand Down

0 comments on commit c40632e

Please sign in to comment.