diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 50019e4d2b1..ae5eeaf0d75 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -70,7 +70,8 @@ namespace DB M(force_fail_to_create_etcd_session) \ M(force_remote_read_for_batch_cop_once) \ M(exception_new_dynamic_thread) \ - M(force_wait_index_timeout) + M(force_wait_index_timeout) \ + M(sync_schema_request_failure) #define APPLY_FOR_FAILPOINTS(M) \ M(skip_check_segment_update) \ diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp index ff561b89310..41b72968164 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFIStatusService.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include #include @@ -23,6 +25,9 @@ #include #include #include +#include +#include +#include #include #include @@ -30,6 +35,11 @@ namespace DB { +namespace FailPoints +{ +extern const char sync_schema_request_failure[]; +} // namespace FailPoints + HttpRequestRes HandleHttpRequestSyncStatus( EngineStoreServerWrap * server, std::string_view path, @@ -277,6 +287,101 @@ HttpRequestRes HandleHttpRequestRemoteGC( }; } +// Acquiring load schema to sync schema from TiKV in this TiFlash node with given keyspace id. +HttpRequestRes HandleHttpRequestSyncSchema( + EngineStoreServerWrap * server, + std::string_view path, + const std::string & api_name, + std::string_view, + std::string_view) +{ + pingcap::pd::KeyspaceID keyspace_id = NullspaceID; + TableID table_id = InvalidTableID; + HttpRequestStatus status = HttpRequestStatus::Ok; + auto log = Logger::get("HandleHttpRequestSyncSchema"); + + auto & global_context = server->tmt->getContext(); + // For compute node, simply return OK + if (global_context.getSharedContextDisagg()->isDisaggregatedComputeMode()) + { + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}, + }; + } + + { + LOG_TRACE(log, "handling sync schema request, path: {}, api_name: {}", path, api_name); + + // schema: /keyspace/{keyspace_id}/table/{table_id} + auto query = path.substr(api_name.size()); + std::vector query_parts; + boost::split(query_parts, query, boost::is_any_of("/")); + if (query_parts.size() != 4 || query_parts[0] != "keyspace" || query_parts[2] != "table") + { + LOG_ERROR(log, "invalid SyncSchema request: {}", query); + status = HttpRequestStatus::ErrorParam; + return HttpRequestRes{ + .status = HttpRequestStatus::ErrorParam, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}}; + } + + try + { + keyspace_id = std::stoll(query_parts[1]); + table_id = std::stoll(query_parts[3]); + } + catch (...) + { + status = HttpRequestStatus::ErrorParam; + } + + if (status != HttpRequestStatus::Ok) + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}}; + } + + std::string err_msg; + try + { + auto & tmt_ctx = *server->tmt; + bool done = tmt_ctx.getSchemaSyncerManager()->syncTableSchema(global_context, keyspace_id, table_id); + if (!done) + { + err_msg = "sync schema failed"; + } + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::sync_schema_request_failure); + } + catch (const DB::Exception & e) + { + err_msg = e.message(); + } + catch (...) + { + err_msg = "sync schema failed, unknown exception"; + } + + if (!err_msg.empty()) + { + Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); + json->set("errMsg", err_msg); + std::stringstream ss; + json->stringify(ss); + + auto * s = RawCppString::New(ss.str()); + return HttpRequestRes{ + .status = HttpRequestStatus::ErrorParam, + .res = CppStrWithView{ + .inner = GenRawCppPtr(s, RawCppPtrTypeImpl::String), + .view = BaseBuffView{s->data(), s->size()}}}; + } + + return HttpRequestRes{ + .status = status, + .res = CppStrWithView{.inner = GenRawCppPtr(), .view = BaseBuffView{nullptr, 0}}}; +} + using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)( EngineStoreServerWrap *, std::string_view, @@ -286,6 +391,7 @@ using HANDLE_HTTP_URI_METHOD = HttpRequestRes (*)( static const std::map AVAILABLE_HTTP_URI = { {"/tiflash/sync-status/", HandleHttpRequestSyncStatus}, + {"/tiflash/sync-schema/", HandleHttpRequestSyncSchema}, {"/tiflash/store-status", HandleHttpRequestStoreStatus}, {"/tiflash/remote/owner/info", HandleHttpRequestRemoteOwnerInfo}, {"/tiflash/remote/owner/resign", HandleHttpRequestRemoteOwnerResign}, diff --git a/dbms/src/Storages/KVStore/tests/gtest_sync_schema.cpp b/dbms/src/Storages/KVStore/tests/gtest_sync_schema.cpp new file mode 100644 index 00000000000..d473b91eb48 --- /dev/null +++ b/dbms/src/Storages/KVStore/tests/gtest_sync_schema.cpp @@ -0,0 +1,177 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int SYNTAX_ERROR; +} // namespace ErrorCodes + +namespace FailPoints +{ +extern const char sync_schema_request_failure[]; +} // namespace FailPoints + +namespace tests +{ +class SyncSchemaTest : public ::testing::Test +{ +public: + SyncSchemaTest() = default; + static void SetUpTestCase() + { + try + { + registerStorages(); + } + catch (DB::Exception &) + { + // Maybe another test has already registed, ignore exception here. + } + } + void SetUp() override { recreateMetadataPath(); } + + void TearDown() override + { + // Clean all database from context. + auto ctx = TiFlashTestEnv::getContext(); + for (const auto & [name, db] : ctx->getDatabases()) + { + ctx->detachDatabase(name); + db->shutdown(); + } + } + static void recreateMetadataPath() + { + String path = TiFlashTestEnv::getContext()->getPath(); + auto p = path + "/metadata/"; + TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true); + p = path + "/data/"; + TiFlashTestEnv::tryRemovePath(p, /*recreate=*/true); + } +}; + +TEST_F(SyncSchemaTest, TestNormal) +try +{ + auto ctx = TiFlashTestEnv::getContext(); + auto pd_client = ctx->getGlobalContext().getTMTContext().getPDClient(); + + MockTiDB::instance().newDataBase("db_1"); + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + }); + auto table_id = MockTiDB::instance().newTable("db_1", "t_1", cols, pd_client->getTS(), ""); + auto schema_syncer = ctx->getTMTContext().getSchemaSyncerManager(); + KeyspaceID keyspace_id = NullspaceID; + schema_syncer->syncSchemas(ctx->getGlobalContext(), keyspace_id); + + EngineStoreServerWrap store_server_wrap{}; + store_server_wrap.tmt = &ctx->getTMTContext(); + auto helper = GetEngineStoreServerHelper(&store_server_wrap); + String path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id); + auto res = helper.fn_handle_http_request( + &store_server_wrap, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{"", 0}); + EXPECT_EQ(res.status, HttpRequestStatus::Ok); + { + // normal errmsg is nil. + EXPECT_EQ(res.res.view.len, 0); + } + delete (static_cast(res.res.inner.ptr)); + + // do sync table schema twice + { + path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id); + auto res = helper.fn_handle_http_request( + &store_server_wrap, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{"", 0}); + EXPECT_EQ(res.status, HttpRequestStatus::Ok); + { + // normal errmsg is nil. + EXPECT_EQ(res.res.view.len, 0); + } + delete (static_cast(res.res.inner.ptr)); + } + + // test wrong table ID + { + TableID wrong_table_id = table_id + 1; + path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, wrong_table_id); + auto res_err = helper.fn_handle_http_request( + &store_server_wrap, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{"", 0}); + EXPECT_EQ(res_err.status, HttpRequestStatus::ErrorParam); + StringRef sr(res_err.res.view.data, res_err.res.view.len); + { + EXPECT_EQ(sr.toString(), "{\"errMsg\":\"sync schema failed\"}"); + } + delete (static_cast(res_err.res.inner.ptr)); + } + + // test sync schema failed + { + path = fmt::format("/tiflash/sync-schema/keyspace/{}/table/{}", keyspace_id, table_id); + FailPointHelper::enableFailPoint(FailPoints::sync_schema_request_failure); + auto res_err1 = helper.fn_handle_http_request( + &store_server_wrap, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{path.data(), path.length()}, + BaseBuffView{"", 0}); + EXPECT_EQ(res_err1.status, HttpRequestStatus::ErrorParam); + StringRef sr(res_err1.res.view.data, res_err1.res.view.len); + { + EXPECT_EQ( + sr.toString(), + "{\"errMsg\":\"Fail point FailPoints::sync_schema_request_failure is triggered.\"}"); + } + delete (static_cast(res_err1.res.inner.ptr)); + } + + dropDataBase("db_1"); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h index 248c74ff0f5..5f007a24fc8 100644 --- a/dbms/src/Storages/KVStore/tests/region_kvstore_test.h +++ b/dbms/src/Storages/KVStore/tests/region_kvstore_test.h @@ -88,4 +88,8 @@ inline void validateSSTGeneration( ASSERT_EQ(counter, key_count); } +ASTPtr parseCreateStatement(const String & statement); +TableID createDBAndTable(String db_name, String table_name); +void dropDataBase(String db_name); + } // namespace DB::tests