From 4a76e912af09e1e48e3bc1ddc0a21991771dde6f Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 12 Aug 2019 15:04:42 +0800 Subject: [PATCH] handle error in cop request (#171) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * handle error in coprocessor request * refine code * use Clear to clear a protobuf message completely * refine code --- .../Flash/Coprocessor/CoprocessorHandler.cpp | 61 ++++++++++++++++++- .../Flash/Coprocessor/CoprocessorHandler.h | 2 +- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 37 ++++++++++- dbms/src/Flash/Coprocessor/DAGDriver.h | 2 + dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 3 +- dbms/src/Flash/FlashService.cpp | 47 ++------------ .../MergeTree/MergeTreeDataSelectExecutor.cpp | 18 +++--- .../Storages/Transaction/RegionException.h | 6 +- 8 files changed, 117 insertions(+), 59 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp b/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp index aed26c39fb2..a92f98b2945 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp +++ b/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include @@ -22,7 +24,8 @@ CoprocessorHandler::CoprocessorHandler( : cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler")) {} -void CoprocessorHandler::execute() +grpc::Status CoprocessorHandler::execute() +try { switch (cop_request->tp()) { @@ -45,6 +48,62 @@ void CoprocessorHandler::execute() throw Exception( "Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED); } + return ::grpc::Status(::grpc::StatusCode::OK, ""); +} +catch (const LockException & e) +{ + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText()); + cop_response->Clear(); + kvrpcpb::LockInfo * lock_info = cop_response->mutable_locked(); + lock_info->set_key(e.lock_infos[0]->key); + lock_info->set_primary_lock(e.lock_infos[0]->primary_lock); + lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl); + lock_info->set_lock_version(e.lock_infos[0]->lock_version); + // return ok so TiDB has the chance to see the LockException + return ::grpc::Status(::grpc::StatusCode::OK, ""); +} +catch (const RegionException & e) +{ + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText()); + cop_response->Clear(); + errorpb::Error * region_err; + switch (e.status) + { + case RegionTable::RegionReadStatus::NOT_FOUND: + case RegionTable::RegionReadStatus::PENDING_REMOVE: + region_err = cop_response->mutable_region_error(); + region_err->mutable_region_not_found()->set_region_id(cop_request->context().region_id()); + break; + case RegionTable::RegionReadStatus::VERSION_ERROR: + region_err = cop_response->mutable_region_error(); + region_err->mutable_epoch_not_match(); + break; + default: + // should not happen + break; + } + // return ok so TiDB has the chance to see the LockException + return ::grpc::Status(::grpc::StatusCode::OK, ""); +} +catch (const Exception & e) +{ + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText()); + cop_response->Clear(); + cop_response->set_other_error(e.message()); + + if (e.code() == ErrorCodes::NOT_IMPLEMENTED) + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message()); + + // TODO: Map other DB error codes to grpc codes. + + return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message()); +} +catch (const std::exception & e) +{ + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what()); + cop_response->Clear(); + cop_response->set_other_error(e.what()); + return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what()); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CoprocessorHandler.h b/dbms/src/Flash/Coprocessor/CoprocessorHandler.h index 2aafa8a71ff..517875e9ace 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorHandler.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorHandler.h @@ -36,7 +36,7 @@ class CoprocessorHandler ~CoprocessorHandler() = default; - void execute(); + grpc::Status execute(); protected: enum diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index ed051b22119..6e25308f5ba 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -9,13 +9,16 @@ #include #include #include +#include +#include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; -} +extern const int UNKNOWN_EXCEPTION; +} // namespace ErrorCodes DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_, UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_) @@ -29,6 +32,7 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, {} void DAGDriver::execute() +try { context.setSetting("read_tso", UInt64(dag_request.start_ts())); @@ -57,8 +61,11 @@ void DAGDriver::execute() // Only query is allowed, so streams.in must not be null and streams.out must be null throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR); - BlockOutputStreamPtr outputStreamPtr = std::make_shared(dag_response, context.getSettings().dag_records_per_chunk, - dag_request.encode_type(), dag.getResultFieldTypes(), streams.in->getHeader()); + BlockOutputStreamPtr outputStreamPtr = std::make_shared(dag_response, + context.getSettings().dag_records_per_chunk, + dag_request.encode_type(), + dag.getResultFieldTypes(), + streams.in->getHeader()); copyData(*streams.in, *outputStreamPtr); // add ExecutorExecutionSummary info for (auto & p_streams : dag_context.profile_streams_list) @@ -81,5 +88,29 @@ void DAGDriver::execute() executeSummary->set_num_iterations(num_iterations); } } +catch (const RegionException & e) +{ + e.rethrow(); +} +catch (const LockException & e) +{ + e.rethrow(); +} +catch (const Exception & e) +{ + recordError(e.code(), e.message()); +} +catch (const std::exception & e) +{ + recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what()); +} + +void DAGDriver::recordError(Int32 err_code, const String & err_msg) +{ + dag_response.Clear(); + tipb::Error * error = dag_response.mutable_error(); + error->set_code(err_code); + error->set_msg(err_msg); +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.h b/dbms/src/Flash/Coprocessor/DAGDriver.h index a9eda48b025..4c84cfcb020 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.h +++ b/dbms/src/Flash/Coprocessor/DAGDriver.h @@ -31,5 +31,7 @@ class DAGDriver tipb::SelectResponse & dag_response; bool internal; + + void recordError(Int32 err_code, const String & err_msg); }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index be5ad0df950..beb8c0a3bd0 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -129,10 +129,9 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id); if (!current_region) { - //todo add more region error info in RegionException std::vector region_ids; region_ids.push_back(info.region_id); - throw RegionException(region_ids); + throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND); } info.range_in_table = current_region->getHandleRangeByTable(table_id); query_info.mvcc_query_info->regions_query_info.push_back(info); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 1abb6189fa9..2b8941f0472 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -2,8 +2,6 @@ #include #include -#include -#include #include #include @@ -51,48 +49,13 @@ grpc::Status FlashService::Coprocessor( return status; } - try - { - CoprocessorContext cop_context(context, request->context(), *grpc_context); - CoprocessorHandler cop_handler(cop_context, request, response); + CoprocessorContext cop_context(context, request->context(), *grpc_context); + CoprocessorHandler cop_handler(cop_context, request, response); - cop_handler.execute(); + auto ret = cop_handler.execute(); - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done"); - return ::grpc::Status(::grpc::StatusCode::OK, ""); - } - catch (const LockException & e) - { - // TODO: handle lock error properly. - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText()); - response->set_data(""); - return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message()); - } - catch (const RegionException & e) - { - // TODO: handle region error properly. - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText()); - response->set_data(""); - return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message()); - } - catch (const Exception & e) - { - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText()); - response->set_data(""); - - if (e.code() == ErrorCodes::NOT_IMPLEMENTED) - return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message()); - - // TODO: Map other DB error codes to grpc codes. - - return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message()); - } - catch (const std::exception & e) - { - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what()); - response->set_data(""); - return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what()); - } + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done"); + return ret; } String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index b9d6773207a..8b1e15a95f3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -222,12 +222,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t // the index of column is constant after MergeTreeBlockInputStream is constructed. exception will be thrown if not found. const size_t handle_column_index = 0, version_column_index = 1, delmark_column_index = 2; - const auto func_throw_retry_region = [&]() { + const auto func_throw_retry_region = [&](RegionTable::RegionReadStatus status) { std::vector region_ids; region_ids.reserve(regions_executor_data.size()); for (const auto & query_info : regions_executor_data) region_ids.push_back(query_info.info.region_id); - throw RegionException(region_ids); + throw RegionException(std::move(region_ids), status); }; /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. @@ -314,7 +314,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t if (region == nullptr) { LOG_WARNING(log, "[region " << query_info.info.region_id << "] is not found in KVStore, try again"); - func_throw_retry_region(); + func_throw_retry_region(RegionTable::RegionReadStatus::NOT_FOUND); } kvstore_region.emplace(query_info.info.region_id, std::move(region)); } @@ -331,13 +331,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t auto start_time = Clock::now(); const size_t mem_region_num = regions_executor_data.size(); const size_t batch_size = mem_region_num / concurrent_num; - std::atomic_bool need_retry = false; + std::atomic_uint8_t region_status = RegionTable::RegionReadStatus::OK; const auto func_run_learner_read = [&](const size_t region_begin) { const size_t region_end = std::min(region_begin + batch_size, mem_region_num); for (size_t region_index = region_begin; region_index < region_end; ++region_index) { - if (need_retry) + if (region_status != RegionTable::RegionReadStatus::OK) return; RegionQueryInfo & region_query_info = regions_executor_data[region_index].info; @@ -359,7 +359,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t << ", handle range [" << region_query_info.range_in_table.first.toString() << ", " << region_query_info.range_in_table.second.toString() << ") , status " << RegionTable::RegionReadStatusString(status)); - need_retry = true; + region_status = status; } else if (block) regions_executor_data[region_index].block = std::move(block); @@ -379,8 +379,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t func_run_learner_read(0); } - if (need_retry) - func_throw_retry_region(); + if (region_status != RegionTable::RegionReadStatus::OK) + func_throw_retry_region(static_cast(region_status.load())); auto end_time = Clock::now(); LOG_DEBUG(log, @@ -862,7 +862,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t << region_query_info.range_in_table.second.toString() << ") , status " << RegionTable::RegionReadStatusString(status)); // throw exception and exit. - func_throw_retry_region(); + func_throw_retry_region(status); } } } diff --git a/dbms/src/Storages/Transaction/RegionException.h b/dbms/src/Storages/Transaction/RegionException.h index 9e661d9eeb8..dfa20262dd4 100644 --- a/dbms/src/Storages/Transaction/RegionException.h +++ b/dbms/src/Storages/Transaction/RegionException.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -9,9 +10,12 @@ namespace DB class RegionException : public Exception { public: - explicit RegionException(std::vector region_ids_) : region_ids(region_ids_) {} + RegionException(std::vector && region_ids_, RegionTable::RegionReadStatus status_) + : Exception(RegionTable::RegionReadStatusString(status_)), region_ids(region_ids_), status(status_) + {} std::vector region_ids; + RegionTable::RegionReadStatus status; }; } // namespace DB