Skip to content

Commit

Permalink
storage: Force evict when downloading vector index files (#253)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish authored and Lloyd-Pottiger committed Aug 30, 2024
1 parent a844c27 commit 6850b6a
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 23 deletions.
123 changes: 112 additions & 11 deletions dbms/src/Storages/S3/FileCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <chrono>
#include <cmath>
#include <filesystem>
#include <magic_enum.hpp>
#include <queue>

namespace ProfileEvents
{
Expand Down Expand Up @@ -210,7 +212,7 @@ FileSegmentPtr FileCache::get(const S3::S3FilenameView & s3_fname, const std::op
// We don't know the exact size of a object/file, but we need reserve space to save the object/file.
// A certain amount of space is reserved for each file type.
auto estimzted_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
if (!reserveSpaceImpl(file_type, estimzted_size, /*try_evict*/ true))
if (!reserveSpaceImpl(file_type, estimzted_size, EvictMode::TryEvict))
{
// Space not enough.
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
Expand Down Expand Up @@ -258,7 +260,7 @@ FileSegmentPtr FileCache::getOrWait(const S3::S3FilenameView & s3_fname, const s
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_miss).Increment();

auto estimated_size = filesize ? *filesize : getEstimatedSizeOfFileType(file_type);
if (!reserveSpaceImpl(file_type, estimated_size, /*try_evict*/ true))
if (!reserveSpaceImpl(file_type, estimated_size, EvictMode::ForceEvict))
{
// Space not enough.
GET_METRIC(tiflash_storage_remote_cache, type_dtfile_full).Increment();
Expand Down Expand Up @@ -361,20 +363,25 @@ std::pair<Int64, std::list<String>::iterator> FileCache::removeImpl(
return {release_size, table.remove(s3_key)};
}

bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, bool try_evict)
bool FileCache::reserveSpaceImpl(FileType reserve_for, UInt64 size, EvictMode evict)
{
if (cache_used + size <= cache_capacity)
{
cache_used += size;
CurrentMetrics::set(CurrentMetrics::DTFileCacheUsed, cache_used);
return true;
}
if (try_evict)
if (evict == EvictMode::TryEvict || evict == EvictMode::ForceEvict)
{
UInt64 min_evict_size = size - (cache_capacity - cache_used);
LOG_DEBUG(log, "tryEvictFile for {} min_evict_size={}", magic_enum::enum_name(reserve_for), min_evict_size);
tryEvictFile(reserve_for, min_evict_size);
return reserveSpaceImpl(reserve_for, size, /*try_evict*/ false);
LOG_DEBUG( //
log,
"tryEvictFile for {} min_evict_size={} evict_mode={}",
magic_enum::enum_name(reserve_for),
min_evict_size,
magic_enum::enum_name(evict));
tryEvictFile(reserve_for, min_evict_size, evict);
return reserveSpaceImpl(reserve_for, size, EvictMode::NoEvict);
}
return false;
}
Expand All @@ -396,8 +403,10 @@ std::vector<FileType> FileCache::getEvictFileTypes(FileType evict_for)
return evict_types;
}

void FileCache::tryEvictFile(FileType evict_for, UInt64 size)
void FileCache::tryEvictFile(FileType evict_for, UInt64 size, EvictMode evict)
{
RUNTIME_CHECK(evict != EvictMode::NoEvict);

auto file_types = getEvictFileTypes(evict_for);
for (auto evict_from : file_types)
{
Expand All @@ -414,9 +423,18 @@ void FileCache::tryEvictFile(FileType evict_for, UInt64 size)
}
else
{
size = 0;
break;
}
}

if (size > 0 && evict == EvictMode::ForceEvict)
{
// After a series of tryEvict, the space is still not sufficient,
// so we do a force eviction.
auto evicted_size = forceEvict(size);
LOG_DEBUG(log, "forceEvict required_size={} evicted_size={}", size, evicted_size);
}
}

UInt64 FileCache::tryEvictFrom(FileType evict_for, UInt64 size, FileType evict_from)
Expand Down Expand Up @@ -460,10 +478,93 @@ UInt64 FileCache::tryEvictFrom(FileType evict_for, UInt64 size, FileType evict_f
return total_released_size;
}

bool FileCache::reserveSpace(FileType reserve_for, UInt64 size, bool try_evict)
struct ForceEvictCandidate
{
UInt64 file_type_slot;
String s3_key;
FileSegmentPtr file_segment;
std::chrono::time_point<std::chrono::system_clock> last_access_time; // Order by this field
};

struct ForceEvictCandidateComparer
{
bool operator()(ForceEvictCandidate a, ForceEvictCandidate b) { return a.last_access_time > b.last_access_time; }
};

UInt64 FileCache::forceEvict(UInt64 size_to_evict)
{
if (size_to_evict == 0)
return 0;

// For a force evict, we simply evict from the oldest to the newest, until
// space is sufficient.

std::priority_queue<ForceEvictCandidate, std::vector<ForceEvictCandidate>, ForceEvictCandidateComparer>
evict_candidates;

// First, pick an item from all levels.

size_t total_released_size = 0;

constexpr auto all_file_types = magic_enum::enum_values<FileType>();
std::vector<std::list<String>::iterator> each_type_lru_iters; // Stores the iterator of next candicate to add
each_type_lru_iters.reserve(all_file_types.size());
for (const auto file_type : all_file_types)
{
auto file_type_slot = static_cast<UInt64>(file_type);
auto iter = tables[file_type_slot].begin();
if (iter != tables[file_type_slot].end())
{
const auto & s3_key = *iter;
const auto & f = tables[file_type_slot].get(s3_key, /*update_lru*/ false);
evict_candidates.emplace(ForceEvictCandidate{
.file_type_slot = file_type_slot,
.s3_key = s3_key,
.file_segment = f,
.last_access_time = f->getLastAccessTime(),
});
iter++;
}
each_type_lru_iters.emplace_back(iter);
}

// Then we iterate the heap to remove the file with oldest access time.

while (!evict_candidates.empty())
{
auto to_evict = evict_candidates.top(); // intentionally copy
evict_candidates.pop();

const auto file_type_slot = to_evict.file_type_slot;
if (each_type_lru_iters[file_type_slot] != tables[file_type_slot].end())
{
const auto s3_key = *each_type_lru_iters[file_type_slot];
const auto & f = tables[file_type_slot].get(s3_key, /*update_lru*/ false);
evict_candidates.emplace(ForceEvictCandidate{
.file_type_slot = file_type_slot,
.s3_key = s3_key,
.file_segment = f,
.last_access_time = f->getLastAccessTime(),
});
each_type_lru_iters[file_type_slot]++;
}

auto [released_size, next_itr] = removeImpl(tables[file_type_slot], to_evict.s3_key, to_evict.file_segment);
LOG_DEBUG(log, "ForceEvict {} size={}", to_evict.s3_key, released_size);
if (released_size >= 0) // removed
{
total_released_size += released_size;
if (total_released_size >= size_to_evict)
break;
}
}
return total_released_size;
}

bool FileCache::reserveSpace(FileType reserve_for, UInt64 size, EvictMode evict)
{
std::lock_guard lock(mtx);
return reserveSpaceImpl(reserve_for, size, try_evict);
return reserveSpaceImpl(reserve_for, size, evict);
}

void FileCache::releaseSpaceImpl(UInt64 size)
Expand Down Expand Up @@ -547,7 +648,7 @@ bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size,
if (content_length > reserved_size)
{
// Need more space.
return reserveSpace(reserve_for, content_length - reserved_size, /*try_evict*/ true);
return reserveSpace(reserve_for, content_length - reserved_size, /*try_evict*/ EvictMode::TryEvict);
}
else if (content_length < reserved_size)
{
Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Storages/S3/FileCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ class FileSegment
return status;
}

auto getLastAccessTime() const
{
std::unique_lock lock(mtx);
return last_access_time;
}

private:
mutable std::mutex mtx;
const String local_fname;
Expand Down Expand Up @@ -303,14 +309,23 @@ class FileCache
static FileSegment::FileType getFileType(const String & fname);
static FileSegment::FileType getFileTypeOfColData(const std::filesystem::path & p);
bool canCache(FileSegment::FileType file_type) const;
bool reserveSpaceImpl(FileSegment::FileType reserve_for, UInt64 size, bool try_evict);

enum class EvictMode
{
NoEvict,
TryEvict,
ForceEvict,
};

bool reserveSpaceImpl(FileSegment::FileType reserve_for, UInt64 size, EvictMode evict);
void releaseSpaceImpl(UInt64 size);
void releaseSpace(UInt64 size);
bool reserveSpace(FileSegment::FileType reserve_for, UInt64 size, bool try_evict);
bool reserveSpace(FileSegment::FileType reserve_for, UInt64 size, EvictMode evict);
bool finalizeReservedSize(FileSegment::FileType reserve_for, UInt64 reserved_size, UInt64 content_length);
static std::vector<FileSegment::FileType> getEvictFileTypes(FileSegment::FileType evict_for);
void tryEvictFile(FileSegment::FileType evict_for, UInt64 size);
void tryEvictFile(FileSegment::FileType evict_for, UInt64 size, EvictMode evict);
UInt64 tryEvictFrom(FileSegment::FileType evict_for, UInt64 size, FileSegment::FileType evict_from);
UInt64 forceEvict(UInt64 size);

// This function is used for test.
std::vector<FileSegmentPtr> getAll();
Expand Down
Loading

0 comments on commit 6850b6a

Please sign in to comment.