Skip to content

Commit

Permalink
Storages: Splitting DMFileReaderPool to reduce lock contention (#9126) (
Browse files Browse the repository at this point in the history
#9375)

close #9125

Co-authored-by: jinhelin <[email protected]>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent a1aaf71 commit f63f87a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
72 changes: 57 additions & 15 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@

namespace DB::DM
{
DMFileReaderPool & DMFileReaderPool::instance()
{
static DMFileReaderPool reader_pool;
return reader_pool;
}

void DMFileReaderPool::add(DMFileReader & reader)
void DMFileReaderPoolSharding::add(const String & path, DMFileReader & reader)
{
std::lock_guard lock(mtx);
readers[reader.path()].insert(&reader);
readers[path].insert(&reader);
}

void DMFileReaderPool::del(DMFileReader & reader)
void DMFileReaderPoolSharding::del(const String & path, DMFileReader & reader)
{
std::lock_guard lock(mtx);
auto itr = readers.find(reader.path());
auto itr = readers.find(path);
if (itr == readers.end())
{
return;
Expand All @@ -43,10 +38,16 @@ void DMFileReaderPool::del(DMFileReader & reader)
}
}

void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col)
void DMFileReaderPoolSharding::set(
const String & path,
DMFileReader & from_reader,
int64_t col_id,
size_t start,
size_t count,
ColumnPtr & col)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
auto itr = readers.find(path);
if (itr == readers.end())
{
return;
Expand All @@ -62,17 +63,58 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
bool DMFileReaderPoolSharding::hasConcurrentReader(const String & path)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
auto itr = readers.find(path);
return itr != readers.end() && itr->second.size() >= 2;
}

DMFileReader * DMFileReaderPool::get(const std::string & name)
DMFileReader * DMFileReaderPoolSharding::get(const std::string & path)
{
std::lock_guard lock(mtx);
auto itr = readers.find(name);
auto itr = readers.find(path);
return itr != readers.end() && !itr->second.empty() ? *(itr->second.begin()) : nullptr;
}

DMFileReaderPool & DMFileReaderPool::instance()
{
static DMFileReaderPool reader_pool;
return reader_pool;
}

DMFileReaderPoolSharding & DMFileReaderPool::getSharding(const String & path)
{
return shardings[std::hash<String>{}(path) % shardings.size()];
}

void DMFileReaderPool::add(DMFileReader & reader)
{
auto path = reader.path();
getSharding(path).add(path, reader);
}

void DMFileReaderPool::del(DMFileReader & reader)
{
auto path = reader.path();
getSharding(path).del(path, reader);
}

void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col)
{
auto path = from_reader.path();
getSharding(path).set(path, from_reader, col_id, start, count, col);
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
{
auto path = from_reader.path();
return getSharding(path).hasConcurrentReader(path);
}

DMFileReader * DMFileReaderPool::get(const std::string & path)
{
return getSharding(path).get(path);
}
} // namespace DB::DM
27 changes: 24 additions & 3 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,27 @@ class ColumnSharingCacheMap

class DMFileReader;

class DMFileReaderPoolSharding
{
public:
void add(const String & path, DMFileReader & reader);
void del(const String & path, DMFileReader & reader);
void set(
const String & path,
DMFileReader & from_reader,
int64_t col_id,
size_t start,
size_t count,
ColumnPtr & col);
bool hasConcurrentReader(const String & path);
// `get` is just for test.
DMFileReader * get(const std::string & path);

private:
std::mutex mtx;
std::unordered_map<std::string, std::unordered_set<DMFileReader *>> readers;
};

// DMFileReaderPool holds all the DMFileReader objects, so we can easily find DMFileReader objects with the same DMFile ID.
// When a DMFileReader object successfully reads a column's packs, it will try to put these packs into other DMFileReader objects' cache.
class DMFileReaderPool
Expand All @@ -226,14 +247,14 @@ class DMFileReaderPool
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);
bool hasConcurrentReader(DMFileReader & from_reader);
// `get` is just for test.
DMFileReader * get(const std::string & name);
DMFileReader * get(const std::string & path);

private:
DMFileReaderPool() = default;
DMFileReaderPoolSharding & getSharding(const String & path);

private:
std::mutex mtx;
std::unordered_map<std::string, std::unordered_set<DMFileReader *>> readers;
std::array<DMFileReaderPoolSharding, 16> shardings;
};

} // namespace DB::DM

0 comments on commit f63f87a

Please sign in to comment.