Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metadata corruption release-5.3 #4495

Merged
merged 11 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Flash/DiagnosticsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,5 +1181,4 @@ ::grpc::Status DiagnosticsService::search_log(

return ::grpc::Status::OK;
}

} // namespace DB
24 changes: 18 additions & 6 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,26 @@ void KVStore::checkAndApplySnapshot(const RegionPtrWrap & new_region, TMTContext

{
const auto & new_range = new_region->getRange();
handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock &) {
for (const auto & region : region_map)
handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock & task_lock) {
for (const auto & overlapped_region : region_map)
{
if (region.first != region_id)
if (overlapped_region.first != region_id)
{
throw Exception(std::string(__PRETTY_FUNCTION__) + ": range of region " + std::to_string(region_id)
+ " is overlapped with region " + std::to_string(region.first) + ", should not happen",
ErrorCodes::LOGICAL_ERROR);
auto state = getProxyHelper()->getRegionLocalState(overlapped_region.first);
if (state.state() != raft_serverpb::PeerState::Tombstone)
{
throw Exception(fmt::format(
"range of region {} is overlapped with {}, state: {}",
region_id,
overlapped_region.first,
state.ShortDebugString()),
ErrorCodes::LOGICAL_ERROR);
}
else
{
LOG_INFO(log, "range of region " << region_id << " is overlapped with `Tombstone` region " << overlapped_region.first);
handleDestroy(overlapped_region.first, tmt, task_lock);
}
}
}
});
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt

void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt)
{
auto task_lock = genTaskLock();
handleDestroy(region_id, tmt, genTaskLock());
}

void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock & task_lock)
{
const auto region = getRegion(region_id);
if (region == nullptr)
{
Expand Down Expand Up @@ -596,6 +600,7 @@ void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & ter
if (!need_retry)
{
// if region is able to get latest commit-index from TiKV, we should make it available only after it has caught up.
assert(resp.read_index() != 0);
regions_to_check.emplace(region_id, resp.read_index());
remain_regions.erase(region_id);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class KVStore final : private boost::noncopyable
TMTContext & tmt);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

private:
RegionManager region_manager;
Expand Down
92 changes: 73 additions & 19 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
#include <diagnosticspb.pb.h>
#include <kvproto/diagnosticspb.pb.h>

#include <ext/scope_guard.h>

#define CHECK_PARSE_PB_BUFF_IMPL(n, a, b, c) \
do \
{ \
[[maybe_unused]] bool parse_res_##n = (a).ParseFromArray(b, static_cast<int>(c)); \
assert(parse_res_##n); \
} while (false)
#define CHECK_PARSE_PB_BUFF_FWD(n, ...) CHECK_PARSE_PB_BUFF_IMPL(n, __VA_ARGS__)
#define CHECK_PARSE_PB_BUFF(...) CHECK_PARSE_PB_BUFF_FWD(__LINE__, __VA_ARGS__)

namespace CurrentMetrics
{
Expand Down Expand Up @@ -83,9 +94,8 @@ EngineStoreApplyRes HandleAdminRaftCmd(
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.ParseFromArray(req_buff.data, static_cast<int>(req_buff.len));
response.ParseFromArray(resp_buff.data, static_cast<int>(resp_buff.len));

CHECK_PARSE_PB_BUFF(request, req_buff.data, req_buff.len);
CHECK_PARSE_PB_BUFF(response, resp_buff.data, resp_buff.len);
auto & kvstore = server->tmt->getKVStore();
return kvstore->handleAdminRaftCmd(
std::move(request),
Expand Down Expand Up @@ -227,6 +237,13 @@ kvrpcpb::ReadIndexResponse TiFlashRaftProxyHelper::readIndex(const kvrpcpb::Read
return std::move(res.at(0).first);
}

void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id)
{
kvrpcpb::ReadIndexResponse res;
CHECK_PARSE_PB_BUFF(res, view.data, view.len);
reinterpret_cast<BatchReadIndexRes *>(resp)->emplace_back(std::move(res), region_id);
}

BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector<kvrpcpb::ReadIndexRequest> & req, uint64_t timeout_ms) const
{
std::vector<std::string> req_strs;
Expand All @@ -239,7 +256,7 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector<kvrpc
auto outer_view = data.intoOuterView();
BatchReadIndexRes res;
res.reserve(req.size());
fn_handle_batch_read_index(proxy_ptr, outer_view, &res, timeout_ms);
fn_handle_batch_read_index(proxy_ptr, outer_view, &res, timeout_ms, InsertBatchReadIndexResp);
return res;
}

Expand Down Expand Up @@ -280,10 +297,19 @@ RawCppPtr PreHandleSnapshot(
try
{
metapb::Region region;
region.ParseFromArray(region_buff.data, static_cast<int>(region_buff.len));
CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len);
auto & tmt = *server->tmt;
auto & kvstore = tmt.getKVStore();
auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term);

#ifndef NDEBUG
{
auto & kvstore = server->tmt->getKVStore();
auto state = kvstore->getProxyHelper()->getRegionLocalState(new_region->id());
assert(state.state() == raft_serverpb::PeerState::Applying);
}
#endif

switch (kvstore->applyMethod())
{
case TiDB::SnapshotApplyMethod::Block:
Expand Down Expand Up @@ -394,24 +420,11 @@ const char * IntoEncryptionMethodName(EncryptionMethod method)
return encryption_method_name[static_cast<uint8_t>(method)];
}

void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id)
{
kvrpcpb::ReadIndexResponse res;
res.ParseFromArray(view.data, view.len);
reinterpret_cast<BatchReadIndexRes *>(resp)->emplace_back(std::move(res), region_id);
}

RawCppPtr GenRawCppPtr(RawVoidPtr ptr_, RawCppPtrTypeImpl type_)
{
return RawCppPtr{ptr_, static_cast<RawCppPtrType>(type_)};
}

void SetServerInfoResp(BaseBuffView view, RawVoidPtr ptr)
{
using diagnosticspb::ServerInfoResponse;
reinterpret_cast<ServerInfoResponse *>(ptr)->ParseFromArray(view.data, view.len);
}

CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_t full)
{
std::string config_file_path;
Expand All @@ -438,4 +451,45 @@ CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_
}
}

void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view)
{
switch (type)
{
case MsgPBType::ReadIndexResponse:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<kvrpcpb::ReadIndexResponse *>(ptr), view.data, view.len);
break;
case MsgPBType::RegionLocalState:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<raft_serverpb::RegionLocalState *>(ptr), view.data, view.len);
break;
case MsgPBType::ServerInfoResponse:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<diagnosticspb::ServerInfoResponse *>(ptr), view.data, view.len);
break;
}
}

raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint64_t region_id) const
{
assert(this->fn_get_region_local_state);

raft_serverpb::RegionLocalState state;
RawCppStringPtr error_msg_ptr{};
SCOPE_EXIT({
delete error_msg_ptr;
});
auto res = this->fn_get_region_local_state(this->proxy_ptr, region_id, &state, &error_msg_ptr);
switch (res)
{
case KVGetStatus::Ok:
break;
case KVGetStatus::Error:
{
throw Exception(fmt::format("{} meet internal error: {}", __FUNCTION__, *error_msg_ptr), ErrorCodes::LOGICAL_ERROR);
}
case KVGetStatus::NotFound:
// make not found as `Tombstone`
state.set_state(raft_serverpb::PeerState::Tombstone);
break;
}
return state;
}
} // namespace DB
11 changes: 7 additions & 4 deletions dbms/src/Storages/Transaction/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ namespace kvrpcpb
class ReadIndexResponse;
class ReadIndexRequest;
} // namespace kvrpcpb
namespace raft_serverpb
{
class RegionLocalState;
}

namespace DB
{
Expand Down Expand Up @@ -53,6 +57,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper
FileEncryptionInfo linkFile(const std::string &, const std::string &) const;
kvrpcpb::ReadIndexResponse readIndex(const kvrpcpb::ReadIndexRequest &) const;
BatchReadIndexRes batchReadIndex(const std::vector<kvrpcpb::ReadIndexRequest> &, uint64_t) const;
raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const;
};

extern "C" {
Expand Down Expand Up @@ -81,10 +86,9 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, void * res, RawCppP
HttpRequestRes HandleHttpRequest(EngineStoreServerWrap *, BaseBuffView path, BaseBuffView query, BaseBuffView body);
uint8_t CheckHttpUriAvailable(BaseBuffView);
void GcRawCppPtr(void * ptr, RawCppPtrType type);
void InsertBatchReadIndexResp(RawVoidPtr, BaseBuffView, uint64_t);
void SetServerInfoResp(BaseBuffView, RawVoidPtr);
BaseBuffView strIntoView(const std::string * str_ptr);
CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full);
void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view);
}

inline EngineStoreServerHelper GetEngineStoreServerHelper(
Expand All @@ -108,9 +112,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(
.fn_handle_http_request = HandleHttpRequest,
.fn_check_http_uri_available = CheckHttpUriAvailable,
.fn_gc_raw_cpp_ptr = GcRawCppPtr,
.fn_insert_batch_read_index_resp = InsertBatchReadIndexResp,
.fn_set_server_info_resp = SetServerInfoResp,
.fn_get_config = GetConfig,
.fn_set_pb_msg_by_bytes = SetPBMsByBytes,
};
}
} // namespace DB
13 changes: 12 additions & 1 deletion tests/fullstack-test/sample.test
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mysql> drop table if exists test.t

mysql> create table if not exists test.t(s varchar(256), i int)
mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc';
mysql> delete from test.t
mysql> insert into test.t values('Hello world', 666)

mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc';

func> wait_table test t

mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t
Expand All @@ -14,6 +15,16 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.
| Hello world | 666 |
+-------------+------+

mysql> insert into test.t values('test', -1)

mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t
+-------------+------+
| s | i |
+-------------+------+
| Hello world | 666 |
| test | -1 |
+-------------+------+

mysql> delete from test.t

mysql> drop table if exists test.t
68 changes: 18 additions & 50 deletions tests/run-gtest.sh
Original file line number Diff line number Diff line change
@@ -1,67 +1,35 @@
#!/usr/bin/env bash

set -x

SRC_TESTS_PATH="$(
cd "$(dirname "$0")"
pwd -P
)"

NPROC=${NPROC:-$(nproc || grep -c ^processor /proc/cpuinfo)}
OUTPUT_XML=${OUTPUT_XML:-false}
ENV_VARS_PATH=${ENV_VARS_PATH:-./_env.sh}
SERIALIZE_TEST_CASES=${SERIALIZE_TEST_CASES:-false}
RUN_TESTS_PARALLEL=${RUN_TESTS_PARALLEL:-false}

function run_test() {
function run_test()
{
local name="$1"
local bin_path=$(find . -name "$name")
local args=""
if [[ "$continue_on_error" -eq 1 ]]; then
args="--gtest_catch_exceptions=1"
else
args="--gtest_break_on_failure --gtest_catch_exceptions=0"
fi
if [[ "${OUTPUT_XML}" == "true" ]]; then
args="${args} --gtest_output=xml"
fi
${bin_path} ${args}
if [[ "$continue_on_error" -eq 1 ]]; then
${bin_path} --gtest_catch_exceptions=1
else
${bin_path} --gtest_break_on_failure --gtest_catch_exceptions=0
fi
}

source ${ENV_VARS_PATH}

continue_on_error="${1:-1}" # default 1
set -ex

function run_test_parallel() {
test_bins="$1"
local args=""
if [[ ${SERIALIZE_TEST_CASES} == "true" ]]; then args="${args} --serialize_test_cases"; fi
if [[ "${continue_on_error}" == "1" ]]; then
args="${args} --gtest_catch_exceptions=1"
else
args="--gtest_break_on_failure --gtest_catch_exceptions=0"
fi

python ${SRC_TESTS_PATH}/gtest_parallel.py ${test_bins} --workers=${NPROC} ${args}
}

set -e
cd "$build_dir"

cd "${build_dir}"
tests=(
"gtests_dbms"
"gtests_libcommon"
#"gtests_tmt" # included in gtests_dbms
)

# Set env variable to run some special test cases.
export ALSO_RUN_WITH_TEST_DATA=1

if [[ ${RUN_TESTS_PARALLEL} != "true" ]]; then
tests=(
"gtests_dbms"
"gtests_libcommon"
"gtests_libdaemon"
#"gtests_tmt" # included in gtests_dbms
)
for test in ${tests[@]}; do
run_test "$test"
done
else
run_test_parallel "${build_dir}/gtests_dbms ${build_dir}/gtests_libcommon ${build_dir}/gtests_libdaemon"
fi
for test in ${tests[@]}; do
run_test "$test"
done