Skip to content

Commit

Permalink
KVStore: decouple flush region and CompactLog with a new FFI fn_try_f…
Browse files Browse the repository at this point in the history
…lush_data (#5283)

ref #5170
  • Loading branch information
CalvinNeo authored Jul 7, 2022
1 parent bf4764b commit 0aec04a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 29 deletions.
88 changes: 60 additions & 28 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,64 @@ void KVStore::persistRegion(const Region & region, const RegionTaskLock & region
LOG_FMT_DEBUG(log, "Persist {} done", region.toString(false));
}

bool KVStore::needFlushRegionData(UInt64 region_id, TMTContext & tmt)
{
auto region_task_lock = region_manager.genRegionTaskLock(region_id);
const RegionPtr curr_region_ptr = getRegion(region_id);
return canFlushRegionDataImpl(curr_region_ptr, false, false, tmt, region_task_lock);
}

bool KVStore::tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt)
{
auto region_task_lock = region_manager.genRegionTaskLock(region_id);
const RegionPtr curr_region_ptr = getRegion(region_id);
return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock);
}

bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock)
{
if (curr_region_ptr == nullptr)
{
throw Exception(fmt::format("region not found when trying flush", ErrorCodes::LOGICAL_ERROR));
}
auto & curr_region = *curr_region_ptr;

auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);

bool can_flush = false;
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and persist mem data.
can_flush = true;
}
else
{
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
}
if (can_flush && flush_if_possible)
{
LOG_FMT_DEBUG(log, "{} flush region due to can_flush_data", curr_region.toString(false));
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, try_until_succeed))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return true;
}
else
{
return false;
}
}
return can_flush;
}

EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
raft_cmdpb::AdminCmdType cmd_type,
UInt64 curr_region_id,
Expand Down Expand Up @@ -360,39 +418,13 @@ EngineStoreApplyRes KVStore::handleUselessAdminRaftCmd(
}
else
{
auto [rows, size_bytes] = curr_region.getApproxMemCacheInfo();

LOG_FMT_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);

if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, try to flush cache and persist mem data.
return true;
}
else
{
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
return !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
}
return canFlushRegionDataImpl(curr_region_ptr, true, /* try_until_succeed */ false, tmt, region_task_lock);
}
};

if (check_sync_log())
{
if (tryFlushRegionCacheInStorage(tmt, curr_region, log, /* try_until_succeed */ false))
{
persistRegion(curr_region, region_task_lock, "compact raft log");
curr_region.markCompactLog();
curr_region.cleanApproxMemCacheInfo();
return EngineStoreApplyRes::Persist;
}
else
{
return EngineStoreApplyRes::None;
}
return EngineStoreApplyRes::Persist;
}
return EngineStoreApplyRes::None;
}
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ class KVStore final : private boost::noncopyable
TMTContext & tmt);
EngineStoreApplyRes handleWriteRaftCmd(const WriteCmdsView & cmds, UInt64 region_id, UInt64 index, UInt64 term, TMTContext & tmt);

bool needFlushRegionData(UInt64 region_id, TMTContext & tmt);
bool tryFlushRegionData(UInt64 region_id, bool try_until_succeed, TMTContext & tmt);

void handleApplySnapshot(metapb::Region && region, uint64_t peer_id, const SSTViewVec, uint64_t index, uint64_t term, TMTContext & tmt);

std::vector<UInt64> /* */ preHandleSnapshotToFiles(
Expand Down Expand Up @@ -219,6 +222,11 @@ class KVStore final : private boost::noncopyable
UInt64 term,
TMTContext & tmt);

/// Notice that if flush_if_possible is set to false, we only check if a flush is allowed by rowsize/size/interval.
/// It will not check if a flush will eventually succeed.
/// In other words, `canFlushRegionDataImpl(flush_if_possible=true)` can return false.
bool canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,34 @@ EngineStoreApplyRes HandleAdminRaftCmd(
}
}

uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id)
{
try
{
auto & kvstore = server->tmt->getKVStore();
return kvstore->needFlushRegionData(region_id, *server->tmt);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed)
{
try
{
auto & kvstore = server->tmt->getKVStore();
return kvstore->tryFlushRegionData(region_id, until_succeed, *server->tmt);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
exit(-1);
}
}

static_assert(sizeof(RaftStoreProxyFFIHelper) == sizeof(TiFlashRaftProxyHelper));
static_assert(alignof(RaftStoreProxyFFIHelper) == alignof(TiFlashRaftProxyHelper));

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ EngineStoreApplyRes HandleAdminRaftCmd(
EngineStoreApplyRes HandleWriteRaftCmd(const EngineStoreServerWrap * server,
WriteCmdsView cmds,
RaftCmdHeader header);
uint8_t NeedFlushData(EngineStoreServerWrap * server, uint64_t region_id);
uint8_t TryFlushData(EngineStoreServerWrap * server, uint64_t region_id, uint8_t until_succeed);
void AtomicUpdateProxy(EngineStoreServerWrap * server, RaftStoreProxyFFIHelper * proxy);
void HandleDestroy(EngineStoreServerWrap * server, uint64_t region_id);
EngineStoreApplyRes HandleIngestSST(EngineStoreServerWrap * server, SSTViewVec snaps, RaftCmdHeader header);
Expand Down Expand Up @@ -158,6 +160,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(
.fn_gen_cpp_string = GenCppRawString,
.fn_handle_write_raft_cmd = HandleWriteRaftCmd,
.fn_handle_admin_raft_cmd = HandleAdminRaftCmd,
.fn_need_flush_data = NeedFlushData,
.fn_try_flush_data = TryFlushData,
.fn_atomic_update_proxy = AtomicUpdateProxy,
.fn_handle_destroy = HandleDestroy,
.fn_handle_ingest_sst = HandleIngestSST,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,12 @@ void RegionKVStoreTest::testKVStore()
ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin");
}
}
{
// There shall be data to flush.
ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true);
// Force flush until succeed only for testing.
ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext()), true);
}
}

void test_mergeresult()
Expand Down

0 comments on commit 0aec04a

Please sign in to comment.