diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index a3ee1bd47c9..cc3d064525a 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit a3ee1bd47c9620297aeecc927224a9113cd95ecb +Subproject commit cc3d064525a7921eedffbf63e6f32fcd7afbf18b diff --git a/dbms/src/Flash/DiagnosticsService.cpp b/dbms/src/Flash/DiagnosticsService.cpp index 62daa1099da..d119be2aac0 100644 --- a/dbms/src/Flash/DiagnosticsService.cpp +++ b/dbms/src/Flash/DiagnosticsService.cpp @@ -1181,5 +1181,4 @@ ::grpc::Status DiagnosticsService::search_log( return ::grpc::Status::OK; } - } // namespace DB diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 4c6088f2dc8..6f9dfe71298 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -79,14 +79,26 @@ void KVStore::checkAndApplySnapshot(const RegionPtrWrap & new_region, TMTContext { const auto & new_range = new_region->getRange(); - handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock &) { - for (const auto & region : region_map) + handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock & task_lock) { + for (const auto & overlapped_region : region_map) { - if (region.first != region_id) + if (overlapped_region.first != region_id) { - throw Exception(std::string(__PRETTY_FUNCTION__) + ": range of region " + std::to_string(region_id) - + " is overlapped with region " + std::to_string(region.first) + ", should not happen", - ErrorCodes::LOGICAL_ERROR); + auto state = getProxyHelper()->getRegionLocalState(overlapped_region.first); + if (state.state() != raft_serverpb::PeerState::Tombstone) + { + throw Exception(fmt::format( + "range of region {} is overlapped with {}, state: {}", + region_id, + overlapped_region.first, + state.ShortDebugString()), + ErrorCodes::LOGICAL_ERROR); + } + else + { + LOG_INFO(log, "range of region " << region_id << " is overlapped with `Tombstone` region " << overlapped_region.first); + handleDestroy(overlapped_region.first, tmt, task_lock); + } } } }); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 2f60f8647a7..8d8ca8c2f6c 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -266,7 +266,11 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt) { - auto task_lock = genTaskLock(); + handleDestroy(region_id, tmt, genTaskLock()); +} + +void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock & task_lock) +{ const auto region = getRegion(region_id); if (region == nullptr) { @@ -596,6 +600,7 @@ void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & ter if (!need_retry) { // if region is able to get latest commit-index from TiKV, we should make it available only after it has caught up. + assert(resp.read_index() != 0); regions_to_check.emplace(region_id, resp.read_index()); remain_regions.erase(region_id); } diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 57dd5536004..f8e7f411543 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -166,6 +166,7 @@ class KVStore final : private boost::noncopyable TMTContext & tmt); void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller); + void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); private: RegionManager region_manager; diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index c4799c28f3f..f18f4442bcb 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -6,7 +6,18 @@ #include #include #include -#include +#include + +#include + +#define CHECK_PARSE_PB_BUFF_IMPL(n, a, b, c) \ + do \ + { \ + [[maybe_unused]] bool parse_res_##n = (a).ParseFromArray(b, static_cast(c)); \ + assert(parse_res_##n); \ + } while (false) +#define CHECK_PARSE_PB_BUFF_FWD(n, ...) CHECK_PARSE_PB_BUFF_IMPL(n, __VA_ARGS__) +#define CHECK_PARSE_PB_BUFF(...) CHECK_PARSE_PB_BUFF_FWD(__LINE__, __VA_ARGS__) namespace CurrentMetrics { @@ -83,9 +94,8 @@ EngineStoreApplyRes HandleAdminRaftCmd( { raft_cmdpb::AdminRequest request; raft_cmdpb::AdminResponse response; - request.ParseFromArray(req_buff.data, static_cast(req_buff.len)); - response.ParseFromArray(resp_buff.data, static_cast(resp_buff.len)); - + CHECK_PARSE_PB_BUFF(request, req_buff.data, req_buff.len); + CHECK_PARSE_PB_BUFF(response, resp_buff.data, resp_buff.len); auto & kvstore = server->tmt->getKVStore(); return kvstore->handleAdminRaftCmd( std::move(request), @@ -227,6 +237,13 @@ kvrpcpb::ReadIndexResponse TiFlashRaftProxyHelper::readIndex(const kvrpcpb::Read return std::move(res.at(0).first); } +void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id) +{ + kvrpcpb::ReadIndexResponse res; + CHECK_PARSE_PB_BUFF(res, view.data, view.len); + reinterpret_cast(resp)->emplace_back(std::move(res), region_id); +} + BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector & req, uint64_t timeout_ms) const { std::vector req_strs; @@ -239,7 +256,7 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector(region_buff.len)); + CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len); auto & tmt = *server->tmt; auto & kvstore = tmt.getKVStore(); auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term); + +#ifndef NDEBUG + { + auto & kvstore = server->tmt->getKVStore(); + auto state = kvstore->getProxyHelper()->getRegionLocalState(new_region->id()); + assert(state.state() == raft_serverpb::PeerState::Applying); + } +#endif + switch (kvstore->applyMethod()) { case TiDB::SnapshotApplyMethod::Block: @@ -394,24 +420,11 @@ const char * IntoEncryptionMethodName(EncryptionMethod method) return encryption_method_name[static_cast(method)]; } -void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id) -{ - kvrpcpb::ReadIndexResponse res; - res.ParseFromArray(view.data, view.len); - reinterpret_cast(resp)->emplace_back(std::move(res), region_id); -} - RawCppPtr GenRawCppPtr(RawVoidPtr ptr_, RawCppPtrTypeImpl type_) { return RawCppPtr{ptr_, static_cast(type_)}; } -void SetServerInfoResp(BaseBuffView view, RawVoidPtr ptr) -{ - using diagnosticspb::ServerInfoResponse; - reinterpret_cast(ptr)->ParseFromArray(view.data, view.len); -} - CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_t full) { std::string config_file_path; @@ -438,4 +451,45 @@ CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_ } } +void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view) +{ + switch (type) + { + case MsgPBType::ReadIndexResponse: + CHECK_PARSE_PB_BUFF(*reinterpret_cast(ptr), view.data, view.len); + break; + case MsgPBType::RegionLocalState: + CHECK_PARSE_PB_BUFF(*reinterpret_cast(ptr), view.data, view.len); + break; + case MsgPBType::ServerInfoResponse: + CHECK_PARSE_PB_BUFF(*reinterpret_cast(ptr), view.data, view.len); + break; + } +} + +raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint64_t region_id) const +{ + assert(this->fn_get_region_local_state); + + raft_serverpb::RegionLocalState state; + RawCppStringPtr error_msg_ptr{}; + SCOPE_EXIT({ + delete error_msg_ptr; + }); + auto res = this->fn_get_region_local_state(this->proxy_ptr, region_id, &state, &error_msg_ptr); + switch (res) + { + case KVGetStatus::Ok: + break; + case KVGetStatus::Error: + { + throw Exception(fmt::format("{} meet internal error: {}", __FUNCTION__, *error_msg_ptr), ErrorCodes::LOGICAL_ERROR); + } + case KVGetStatus::NotFound: + // make not found as `Tombstone` + state.set_state(raft_serverpb::PeerState::Tombstone); + break; + } + return state; +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index 78b805bb343..cf99748ea4a 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -14,6 +14,10 @@ namespace kvrpcpb class ReadIndexResponse; class ReadIndexRequest; } // namespace kvrpcpb +namespace raft_serverpb +{ +class RegionLocalState; +} namespace DB { @@ -53,6 +57,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper FileEncryptionInfo linkFile(const std::string &, const std::string &) const; kvrpcpb::ReadIndexResponse readIndex(const kvrpcpb::ReadIndexRequest &) const; BatchReadIndexRes batchReadIndex(const std::vector &, uint64_t) const; + raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const; }; extern "C" { @@ -81,10 +86,9 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, void * res, RawCppP HttpRequestRes HandleHttpRequest(EngineStoreServerWrap *, BaseBuffView path, BaseBuffView query, BaseBuffView body); uint8_t CheckHttpUriAvailable(BaseBuffView); void GcRawCppPtr(void * ptr, RawCppPtrType type); -void InsertBatchReadIndexResp(RawVoidPtr, BaseBuffView, uint64_t); -void SetServerInfoResp(BaseBuffView, RawVoidPtr); BaseBuffView strIntoView(const std::string * str_ptr); CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full); +void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view); } inline EngineStoreServerHelper GetEngineStoreServerHelper( @@ -108,9 +112,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper( .fn_handle_http_request = HandleHttpRequest, .fn_check_http_uri_available = CheckHttpUriAvailable, .fn_gc_raw_cpp_ptr = GcRawCppPtr, - .fn_insert_batch_read_index_resp = InsertBatchReadIndexResp, - .fn_set_server_info_resp = SetServerInfoResp, .fn_get_config = GetConfig, + .fn_set_pb_msg_by_bytes = SetPBMsByBytes, }; } } // namespace DB diff --git a/tests/fullstack-test/sample.test b/tests/fullstack-test/sample.test index bff1d2f326d..2e575888816 100644 --- a/tests/fullstack-test/sample.test +++ b/tests/fullstack-test/sample.test @@ -1,10 +1,11 @@ mysql> drop table if exists test.t mysql> create table if not exists test.t(s varchar(256), i int) -mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc'; mysql> delete from test.t mysql> insert into test.t values('Hello world', 666) +mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc'; + func> wait_table test t mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t @@ -14,6 +15,16 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test. | Hello world | 666 | +-------------+------+ +mysql> insert into test.t values('test', -1) + +mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t ++-------------+------+ +| s | i | ++-------------+------+ +| Hello world | 666 | +| test | -1 | ++-------------+------+ + mysql> delete from test.t mysql> drop table if exists test.t diff --git a/tests/run-gtest.sh b/tests/run-gtest.sh index 8fdbda7df17..d4135b90286 100755 --- a/tests/run-gtest.sh +++ b/tests/run-gtest.sh @@ -1,67 +1,35 @@ #!/usr/bin/env bash -set -x - -SRC_TESTS_PATH="$( - cd "$(dirname "$0")" - pwd -P -)" - -NPROC=${NPROC:-$(nproc || grep -c ^processor /proc/cpuinfo)} -OUTPUT_XML=${OUTPUT_XML:-false} ENV_VARS_PATH=${ENV_VARS_PATH:-./_env.sh} -SERIALIZE_TEST_CASES=${SERIALIZE_TEST_CASES:-false} -RUN_TESTS_PARALLEL=${RUN_TESTS_PARALLEL:-false} -function run_test() { +function run_test() +{ local name="$1" local bin_path=$(find . -name "$name") - local args="" - if [[ "$continue_on_error" -eq 1 ]]; then - args="--gtest_catch_exceptions=1" - else - args="--gtest_break_on_failure --gtest_catch_exceptions=0" - fi - if [[ "${OUTPUT_XML}" == "true" ]]; then - args="${args} --gtest_output=xml" - fi - ${bin_path} ${args} + if [[ "$continue_on_error" -eq 1 ]]; then + ${bin_path} --gtest_catch_exceptions=1 + else + ${bin_path} --gtest_break_on_failure --gtest_catch_exceptions=0 + fi } source ${ENV_VARS_PATH} continue_on_error="${1:-1}" # default 1 +set -ex -function run_test_parallel() { - test_bins="$1" - local args="" - if [[ ${SERIALIZE_TEST_CASES} == "true" ]]; then args="${args} --serialize_test_cases"; fi - if [[ "${continue_on_error}" == "1" ]]; then - args="${args} --gtest_catch_exceptions=1" - else - args="--gtest_break_on_failure --gtest_catch_exceptions=0" - fi - - python ${SRC_TESTS_PATH}/gtest_parallel.py ${test_bins} --workers=${NPROC} ${args} -} - -set -e +cd "$build_dir" -cd "${build_dir}" +tests=( + "gtests_dbms" + "gtests_libcommon" + #"gtests_tmt" # included in gtests_dbms +) # Set env variable to run some special test cases. export ALSO_RUN_WITH_TEST_DATA=1 -if [[ ${RUN_TESTS_PARALLEL} != "true" ]]; then - tests=( - "gtests_dbms" - "gtests_libcommon" - "gtests_libdaemon" - #"gtests_tmt" # included in gtests_dbms - ) - for test in ${tests[@]}; do - run_test "$test" - done -else - run_test_parallel "${build_dir}/gtests_dbms ${build_dir}/gtests_libcommon ${build_dir}/gtests_libdaemon" -fi +for test in ${tests[@]}; do + run_test "$test" +done +