diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 7d4e73b43e8..0878b53d78a 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 7d4e73b43e8ddf47946378208b6db25df8d4e1e6 +Subproject commit 0878b53d78a787bfaa40b6bcac43bbb950b685c0 diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 5011c07c1b8..834a058c69e 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -872,8 +872,7 @@ int Server::main(const std::vector & /*args*/) .fn_handle_http_request = HandleHttpRequest, .fn_check_http_uri_available = CheckHttpUriAvailable, .fn_gc_raw_cpp_ptr = GcRawCppPtr, - .fn_gen_batch_read_index_res = GenBatchReadIndexRes, - .fn_insert_batch_read_index_resp = InsertBatchReadIndexResp, + .fn_set_pb_msg_by_bytes = SetPBMsByBytes, }; RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log); diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 9f3b752ebd5..c8bb9ed2739 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -80,14 +80,24 @@ void KVStore::checkAndApplySnapshot(const RegionPtrWrap & new_region, TMTContext { const auto & new_range = new_region->getRange(); - handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock &) { - for (const auto & region : region_map) + handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock & task_lock) { + for (const auto & overlapped_region : region_map) { - if (region.first != region_id) + if (overlapped_region.first != region_id) { - throw Exception(std::string(__PRETTY_FUNCTION__) + ": range of region " + std::to_string(region_id) - + " is overlapped with region " + std::to_string(region.first) + ", should not happen", - ErrorCodes::LOGICAL_ERROR); + auto state = getProxyHelper()->getRegionLocalState(overlapped_region.first); + if (state.state() != raft_serverpb::PeerState::Tombstone) + { + throw Exception(std::string(__PRETTY_FUNCTION__) + ": range of region " + std::to_string(region_id) + + " is overlapped with " + std::to_string(overlapped_region.first) + ", state: " + state.ShortDebugString(), + ErrorCodes::LOGICAL_ERROR); + } + else + { + LOG_INFO( + log, "range of region " << region_id << " is overlapped with `Tombstone` region " << overlapped_region.first); + handleDestroy(overlapped_region.first, tmt, task_lock); + } } } }); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 687a6137dfa..3774692c502 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -240,9 +240,10 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt return res; } -void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt) +void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt) { handleDestroy(region_id, tmt, genTaskLock()); } + +void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock & task_lock) { - auto task_lock = genTaskLock(); const auto region = getRegion(region_id); if (region == nullptr) { @@ -546,7 +547,7 @@ void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & ter } } auto read_index_res = tmt.getKVStore()->getProxyHelper()->batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout()); - for (auto && [resp, region_id] : *read_index_res) + for (auto && [resp, region_id] : read_index_res) { bool need_retry = resp.read_index() == 0; if (resp.has_region_error()) @@ -558,6 +559,7 @@ void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & ter if (!need_retry) { // if region is able to get latest commit-index from TiKV, we should make it available only after it has caught up. + assert(resp.read_index() != 0); regions_to_check.emplace(region_id, resp.read_index()); remain_regions.erase(region_id); } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 743d1c7eb7d..d6842fc4aa7 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -145,6 +145,7 @@ class KVStore final : private boost::noncopyable raft_cmdpb::AdminCmdType cmd_type, UInt64 curr_region_id, UInt64 index, UInt64 term, TMTContext & tmt); void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller); + void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); private: RegionManager region_manager; diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 6b20c061962..0ed4721c3d5 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -231,7 +231,7 @@ LearnerReadSnapshot doLearnerRead(const TiDB::TableID table_id, // if (auto proxy_helper = kvstore->getProxyHelper(); proxy_helper) { auto res = proxy_helper->batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout()); - for (auto && [resp, region_id] : *res) + for (auto && [resp, region_id] : res) { batch_read_index_result.emplace(region_id, std::move(resp)); } diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index b726e5d631b..b33b4a47c01 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -7,6 +7,17 @@ #include #include +#include + +#define CHECK_PARSE_PB_BUFF_IMPL(n, a, b, c) \ + do \ + { \ + [[maybe_unused]] bool parse_res_##n = (a).ParseFromArray(b, static_cast(c)); \ + assert(parse_res_##n); \ + } while (false) +#define CHECK_PARSE_PB_BUFF_FWD(n, ...) CHECK_PARSE_PB_BUFF_IMPL(n, __VA_ARGS__) +#define CHECK_PARSE_PB_BUFF(...) CHECK_PARSE_PB_BUFF_FWD(__LINE__, __VA_ARGS__) + namespace CurrentMetrics { extern const Metric RaftNumSnapshotsPendingApply; @@ -77,9 +88,8 @@ EngineStoreApplyRes HandleAdminRaftCmd( { raft_cmdpb::AdminRequest request; raft_cmdpb::AdminResponse response; - request.ParseFromArray(req_buff.data, (int)req_buff.len); - response.ParseFromArray(resp_buff.data, (int)resp_buff.len); - + CHECK_PARSE_PB_BUFF(request, req_buff.data, req_buff.len); + CHECK_PARSE_PB_BUFF(response, resp_buff.data, resp_buff.len); auto & kvstore = server->tmt->getKVStore(); return kvstore->handleAdminRaftCmd( std::move(request), std::move(response), header.region_id, header.index, header.term, *server->tmt); @@ -193,7 +203,14 @@ void CppStrVec::updateView() kvrpcpb::ReadIndexResponse TiFlashRaftProxyHelper::readIndex(const kvrpcpb::ReadIndexRequest & req) const { auto res = batchReadIndex({req}, DEFAULT_BATCH_READ_INDEX_TIMEOUT_MS); - return std::move(res->at(0).first); + return std::move(res.at(0).first); +} + +void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id) +{ + kvrpcpb::ReadIndexResponse res; + CHECK_PARSE_PB_BUFF(res, view.data, view.len); + reinterpret_cast(resp)->emplace_back(std::move(res), region_id); } BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector & req, uint64_t timeout_ms) const @@ -207,7 +224,9 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector(fn_handle_batch_read_index(proxy_ptr, outer_view, timeout_ms))); + BatchReadIndexRes res; + res.reserve(req.size()); + fn_handle_batch_read_index(proxy_ptr, outer_view, &res, timeout_ms, InsertBatchReadIndexResp); return res; } @@ -240,10 +259,19 @@ RawCppPtr PreHandleSnapshot( try { metapb::Region region; - region.ParseFromArray(region_buff.data, (int)region_buff.len); + CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len); auto & tmt = *server->tmt; auto & kvstore = tmt.getKVStore(); auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term); + +#ifndef NDEBUG + { + auto & kvstore = server->tmt->getKVStore(); + auto state = kvstore->getProxyHelper()->getRegionLocalState(new_region->id()); + assert(state.state() == raft_serverpb::PeerState::Applying); + } +#endif + switch (kvstore->applyMethod()) { case TiDB::SnapshotApplyMethod::Block: @@ -320,7 +348,7 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, RawVoidPtr res, Raw } } -void GcRawCppPtr(EngineStoreServerWrap *, RawVoidPtr ptr, RawCppPtrType type) +void GcRawCppPtr(RawVoidPtr ptr, RawCppPtrType type) { if (ptr) { @@ -354,20 +382,45 @@ const char * IntoEncryptionMethodName(EncryptionMethod method) return EncryptionMethodName[static_cast(method)]; } -RawVoidPtr GenBatchReadIndexRes(uint64_t cap) -{ - auto res = new BatchReadIndexRes::element_type(); - res->reserve(cap); - return res; -} +RawCppPtr GenRawCppPtr(RawVoidPtr ptr_, RawCppPtrTypeImpl type_) { return RawCppPtr{ptr_, static_cast(type_)}; } -void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id) +void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view) { - kvrpcpb::ReadIndexResponse res; - res.ParseFromArray(view.data, view.len); - reinterpret_cast(resp)->emplace_back(std::move(res), region_id); + switch (type) + { + case MsgPBType::ReadIndexResponse: + CHECK_PARSE_PB_BUFF(*reinterpret_cast(ptr), view.data, view.len); + break; + case MsgPBType::RegionLocalState: + CHECK_PARSE_PB_BUFF(*reinterpret_cast(ptr), view.data, view.len); + break; + default: + throw Exception( + std::string(__FUNCTION__) + ": meet unknown type " + std::to_string(static_cast(type)), ErrorCodes::LOGICAL_ERROR); + } } -RawCppPtr GenRawCppPtr(RawVoidPtr ptr_, RawCppPtrTypeImpl type_) { return RawCppPtr{ptr_, static_cast(type_)}; } +raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint64_t region_id) const +{ + assert(this->fn_get_region_local_state); + raft_serverpb::RegionLocalState state; + RawCppStringPtr error_msg_ptr{}; + SCOPE_EXIT({ delete error_msg_ptr; }); + auto res = this->fn_get_region_local_state(this->proxy_ptr, region_id, &state, &error_msg_ptr); + switch (res) + { + case KVGetStatus::Ok: + break; + case KVGetStatus::Error: + { + throw Exception(std::string(__FUNCTION__) + ": meet internal error: " + *error_msg_ptr, ErrorCodes::LOGICAL_ERROR); + } + case KVGetStatus::NotFound: + // make not found as `Tombstone` + state.set_state(raft_serverpb::PeerState::Tombstone); + break; + } + return state; +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index 022b67e9b1e..ed50c35e9ce 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -13,6 +13,10 @@ namespace kvrpcpb class ReadIndexResponse; class ReadIndexRequest; } // namespace kvrpcpb +namespace raft_serverpb +{ +class RegionLocalState; +} namespace DB { @@ -28,8 +32,7 @@ struct EngineStoreServerWrap std::atomic status{EngineStoreServerStatus::Idle}; }; -using BatchReadIndexRes = std::unique_ptr>>; -static_assert(std::is_same_v); +using BatchReadIndexRes = std::vector>; struct FileEncryptionInfo; @@ -54,6 +57,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper FileEncryptionInfo linkFile(const std::string &, const std::string &) const; kvrpcpb::ReadIndexResponse readIndex(const kvrpcpb::ReadIndexRequest &) const; BatchReadIndexRes batchReadIndex(const std::vector &, uint64_t) const; + raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const; }; extern "C" { @@ -72,8 +76,8 @@ RawCppPtr PreHandleSnapshot( void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, void * res, RawCppPtrType type); HttpRequestRes HandleHttpRequest(EngineStoreServerWrap *, BaseBuffView); uint8_t CheckHttpUriAvailable(BaseBuffView); -void GcRawCppPtr(EngineStoreServerWrap *, void * ptr, RawCppPtrType type); -RawVoidPtr GenBatchReadIndexRes(uint64_t cap); -void InsertBatchReadIndexResp(RawVoidPtr, BaseBuffView, uint64_t); +void GcRawCppPtr(void * ptr, RawCppPtrType type); +void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view); } + } // namespace DB diff --git a/tests/fullstack-test/sample.test b/tests/fullstack-test/sample.test index bff1d2f326d..2e575888816 100644 --- a/tests/fullstack-test/sample.test +++ b/tests/fullstack-test/sample.test @@ -1,10 +1,11 @@ mysql> drop table if exists test.t mysql> create table if not exists test.t(s varchar(256), i int) -mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc'; mysql> delete from test.t mysql> insert into test.t values('Hello world', 666) +mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc'; + func> wait_table test t mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t @@ -14,6 +15,16 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test. | Hello world | 666 | +-------------+------+ +mysql> insert into test.t values('test', -1) + +mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t ++-------------+------+ +| s | i | ++-------------+------+ +| Hello world | 666 | +| test | -1 | ++-------------+------+ + mysql> delete from test.t mysql> drop table if exists test.t