Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAP: compute RU and match index and term when handling snapshot #8716

Merged
merged 15 commits into from
Jan 29, 2024
7 changes: 7 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru)
counter->Increment(ru);
}

UInt64 TiFlashMetrics::debugQueryReplicaSyncRU(UInt32 keyspace_id)
{
std::unique_lock lock(replica_sync_ru_mtx);
auto * counter = getReplicaSyncRUCounter(keyspace_id, lock);
return counter->Value();
}

prometheus::Counter * TiFlashMetrics::getReplicaSyncRUCounter(UInt32 keyspace_id, std::unique_lock<std::mutex> &)
{
auto itr = registered_keyspace_sync_replica_ru.find(keyspace_id);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ class TiFlashMetrics
static TiFlashMetrics & instance();

void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru);
UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id);

private:
TiFlashMetrics();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ class DeltaMergeStore : private boost::noncopyable
return buildSegmentsFromCheckpointInfo(dm_context, range, checkpoint_info);
}

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info);

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const Context & db_context,
const DB::Settings & db_settings,
const DM::RowKeyRange & range,
Expand Down
19 changes: 14 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo(
return {};
}

void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
UInt64 DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info)
Expand All @@ -1202,18 +1202,25 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
"Ingest checkpoint from remote meet empty range, ignore, store_id={} region_id={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId());
return;
return 0;
}

auto restored_segments = checkpoint_info->getRestoredSegments();
auto updated_segments = ingestSegmentsUsingSplit(dm_context, range, restored_segments);
auto estimated_bytes = 0;

for (const auto & segment : restored_segments)
{
estimated_bytes += segment->getEstimatedBytes();
}

LOG_INFO(
log,
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={}",
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={} est_bytes={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId(),
restored_segments.size());

restored_segments.size(),
estimated_bytes);

WriteBatches wbs{*dm_context->storage_pool};
for (auto & segment : restored_segments)
Expand All @@ -1228,6 +1235,8 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
// TODO(fap) This could be executed in a dedicated thread if it consumes too much time.
for (auto & segment : updated_segments)
checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap);

return estimated_bytes;
}

} // namespace DM
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Storages/KVStore/FFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,19 @@ void HandleSafeTSUpdate(
uint64_t self_safe_ts,
uint64_t leader_safe_ts);
FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, uint64_t new_peer_id);
uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist);
FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id);
uint8_t ApplyFapSnapshot(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint8_t assert_exist,
uint64_t index,
uint64_t term);
FapSnapshotState QueryFapSnapshotState(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint64_t index,
uint64_t term);
void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id);
bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id);
}
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ void KVStore::onSnapshot(
}
else if constexpr (std::is_same_v<RegionPtrWrap, RegionPtrWithCheckpointInfo>)
{
dm_storage->ingestSegmentsFromCheckpointInfo(
auto ingested_bytes = dm_storage->ingestSegmentsFromCheckpointInfo(
new_key_range,
new_region_wrap.checkpoint_info,
context.getSettingsRef());
if (auto [count, is_syncing] = getTiFlashReplicaSyncInfo(dm_storage); is_syncing)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
// For write, 1 RU per KB. Reference: https://docs.pingcap.com/tidb/v7.0/tidb-resource-control
// Only calculate RU of one replica. So each replica reports 1/count consumptions.
TiFlashMetrics::instance().addReplicaSyncRU(
keyspace_id,
std::ceil(static_cast<double>(ingested_bytes) / 1024.0 / count));
}
}
else
{
Expand Down
62 changes: 53 additions & 9 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,14 @@ FastAddPeerRes FastAddPeerImpl(
}
}

uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id)
uint8_t ApplyFapSnapshotImpl(
TMTContext & tmt,
TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
UInt64 peer_id,
bool assert_exist,
UInt64 index,
UInt64 term)
{
auto log = Logger::get("FastAddPeer");
Stopwatch watch_ingest;
Expand All @@ -467,6 +474,14 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
auto checkpoint_ingest_info = fap_ctx->getOrRestoreCheckpointIngestInfo(tmt, proxy_helper, region_id, peer_id);
if (!checkpoint_ingest_info)
{
if (assert_exist)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected to have fap snapshot, region_id={}, peer_id={}",
region_id,
peer_id);
}
// If fap is enabled, and this region is not currently exists on proxy's side,
// proxy will check if we have a fap snapshot first.
// If we don't, the snapshot should be a regular snapshot.
Expand All @@ -487,6 +502,22 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
peer_id,
begin);
}
// `region_to_ingest` is not the region in kvstore.
auto region_to_ingest = checkpoint_ingest_info->getRegion();
RUNTIME_CHECK(region_to_ingest != nullptr);
if (!(region_to_ingest->appliedIndex() == index && region_to_ingest->appliedIndexTerm() == term))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatched region and term, expected=({},{}) actual=({},{}) region_id={} peer_id={} begin_time={}",
index,
term,
region_to_ingest->appliedIndex(),
region_to_ingest->appliedIndexTerm(),
region_id,
peer_id,
begin);
}
LOG_INFO(log, "Begin apply fap snapshot, region_id={} peer_id={} begin_time={}", region_id, peer_id, begin);
// If there is `checkpoint_ingest_info`, it is exactly the data we want to ingest. Consider two scene:
// 1. If there was a failed FAP which failed to clean, its data will be overwritten by current FAP which has finished phase 1.
Expand All @@ -509,7 +540,12 @@ uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_he
}
}

FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id)
FapSnapshotState QueryFapSnapshotState(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint64_t index,
uint64_t term)
{
try
{
Expand All @@ -519,10 +555,14 @@ FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t
return FapSnapshotState::Other;
auto fap_ctx = server->tmt->getContext().getSharedContextDisagg()->fap_context;
// We just restore it, since if there is, it will soon be used.
if (fap_ctx->getOrRestoreCheckpointIngestInfo(*(server->tmt), server->proxy_helper, region_id, peer_id)
!= nullptr)
if (auto ptr
= fap_ctx->getOrRestoreCheckpointIngestInfo(*(server->tmt), server->proxy_helper, region_id, peer_id);
ptr != nullptr)
{
return FapSnapshotState::Persisted;
RUNTIME_CHECK(ptr->getRegion() != nullptr);
if (ptr->getRegion()->appliedIndex() == index && ptr->getRegion()->appliedIndexTerm() == term)
return FapSnapshotState::Persisted;
return FapSnapshotState::NotFound;
}
return FapSnapshotState::NotFound;
}
Expand All @@ -535,17 +575,21 @@ FapSnapshotState QueryFapSnapshotState(EngineStoreServerWrap * server, uint64_t
}
}

uint8_t ApplyFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id, uint64_t peer_id, uint8_t assert_exist)
uint8_t ApplyFapSnapshot(
EngineStoreServerWrap * server,
uint64_t region_id,
uint64_t peer_id,
uint8_t assert_exist,
uint64_t index,
uint64_t term)
{
// TODO(fap) use assert_exist to check.
UNUSED(assert_exist);
try
{
RUNTIME_CHECK_MSG(server->tmt, "TMTContext is null");
RUNTIME_CHECK_MSG(server->proxy_helper, "proxy_helper is null");
if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode())
return false;
return ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id);
return ApplyFapSnapshotImpl(*server->tmt, server->proxy_helper, region_id, peer_id, assert_exist, index, term);
}
catch (...)
{
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,12 @@ FastAddPeerRes FastAddPeerImpl(
UInt64 region_id,
UInt64 new_peer_id,
UInt64 start_time);
uint8_t ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id);
uint8_t ApplyFapSnapshotImpl(
TMTContext & tmt,
TiFlashRaftProxyHelper * proxy_helper,
UInt64 region_id,
UInt64 peer_id,
bool assert_exist,
UInt64 index,
UInt64 term);
} // namespace DB
52 changes: 49 additions & 3 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/Types.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/KVStore/tests/kvstore_helper.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
Expand Down Expand Up @@ -174,6 +175,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase

protected:
UInt64 upload_sequence = 1000;
UInt64 table_id;

private:
ContextPtr context;
Expand Down Expand Up @@ -353,7 +355,7 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
global_context.getTMTContext().debugSetKVStore(kvstore);
auto page_storage = global_context.getWriteNodePageStorage();

TableID table_id = proxy_instance->bootstrapTable(global_context, kvs, global_context.getTMTContext());
table_id = proxy_instance->bootstrapTable(global_context, kvs, global_context.getTMTContext());
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt);
auto proxy_helper = proxy_instance->generateProxyHelper();
Expand All @@ -365,6 +367,7 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
UInt64 index = 0;
if (!opt.persist_empty_segment)
{
LOG_DEBUG(log, "Do write to the region");
auto k1 = RecordKVFormat::genKey(table_id, 1, 111);
auto && [value_write1, value_default1] = proxy_instance->generateTiKVKeyValue(111, 999);
UInt64 term = 0;
Expand Down Expand Up @@ -415,6 +418,15 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
return mock_data;
}

// This function get tiflash replica count from local schema.
void setTiFlashReplicaSyncInfo(StorageDeltaMergePtr & dm_storage)
{
auto table_info = dm_storage->getTableInfo();
table_info.replica_info.count = 1;
table_info.replica_info.available = false;
dm_storage->setTableInfo(table_info);
}

// Test load from restart.
TEST_F(RegionKVStoreTestFAP, RestoreFromRestart1)
try
Expand All @@ -426,6 +438,13 @@ try
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
uint64_t region_id = 1;

{
auto storage = global_context.getTMTContext().getStorages().get(NullspaceID, table_id);
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
ASSERT_TRUE(dm_storage != nullptr);
setTiFlashReplicaSyncInfo(dm_storage);
}

std::mutex exe_mut;
std::unique_lock exe_lock(exe_mut);
fap_context->tasks_trace->addTask(region_id, [&]() {
Expand All @@ -437,6 +456,10 @@ try
exe_lock.unlock();
fap_context->tasks_trace->fetchResult(region_id);

auto region_to_ingest
= fap_context
->getOrRestoreCheckpointIngestInfo(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333)
->getRegion();
// Remove the checkpoint ingest info and region from memory.
// Testing whether FAP can be handled properly after restart.
fap_context->debugRemoveCheckpointIngestInfo(region_id);
Expand All @@ -445,8 +468,18 @@ try
SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_not_clean_fap_on_destroy"); });
kvstore->handleDestroy(region_id, global_context.getTMTContext());

auto prev_ru = TiFlashMetrics::instance().debugQueryReplicaSyncRU(NullspaceID);
// After restart, continue the FAP from persisted checkpoint ingest info.
ApplyFapSnapshotImpl(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333);
ApplyFapSnapshotImpl(
global_context.getTMTContext(),
proxy_helper.get(),
region_id,
2333,
true,
region_to_ingest->appliedIndex(),
region_to_ingest->appliedIndexTerm());
auto current_ru = TiFlashMetrics::instance().debugQueryReplicaSyncRU(NullspaceID);
ASSERT_GT(current_ru, prev_ru);

{
auto keyspace_id = kv_region->getKeyspaceID();
Expand Down Expand Up @@ -814,12 +847,25 @@ try
exe_lock.unlock();
fap_context->tasks_trace->fetchResult(region_id);

auto region_to_ingest
= fap_context
->getOrRestoreCheckpointIngestInfo(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333)
->getRegion();
// Make sure prehandling will not clean fap snapshot.
std::vector<SSTView> ssts;
SSTViewVec snaps{ssts.data(), ssts.size()};
kvs.preHandleSnapshotToFiles(kv_region, snaps, 100, 100, std::nullopt, global_context.getTMTContext());

EXPECT_THROW(ApplyFapSnapshotImpl(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333), Exception);
EXPECT_THROW(
ApplyFapSnapshotImpl(
global_context.getTMTContext(),
proxy_helper.get(),
region_id,
2333,
false,
region_to_ingest->appliedIndex(),
region_to_ingest->appliedIndexTerm()),
Exception);
}
CATCH

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ DM::Segments StorageDeltaMerge::buildSegmentsFromCheckpointInfo(
return getAndMaybeInitStore()->buildSegmentsFromCheckpointInfo(global_context, settings, range, checkpoint_info);
}

void StorageDeltaMerge::ingestSegmentsFromCheckpointInfo(
UInt64 StorageDeltaMerge::ingestSegmentsFromCheckpointInfo(
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info,
const Settings & settings)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class StorageDeltaMerge
CheckpointInfoPtr checkpoint_info,
const Settings & settings);

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info,
const Settings & settings);
Expand Down