Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle error in cop request #171

Merged
merged 28 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
71c09fb
fix cop test regression
windtalker Aug 6, 2019
6b8a054
address comments
windtalker Aug 6, 2019
6f32efd
format code
windtalker Aug 6, 2019
11b3e09
fix npe for dag execute
windtalker Aug 6, 2019
64fef5c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
f96fcf4
format code
windtalker Aug 6, 2019
324b64d
address comment
windtalker Aug 6, 2019
6b06122
add some comments
windtalker Aug 6, 2019
2327e9f
throw exception when meet error duing cop request handling
windtalker Aug 6, 2019
72d11ad
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
428459a
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 7, 2019
f3eb6e5
address comments
windtalker Aug 7, 2019
d8bb7d9
add error code
windtalker Aug 7, 2019
b6eaa3b
throw exception when meet error duing cop request handling
windtalker Aug 7, 2019
fe7916e
address comments
windtalker Aug 7, 2019
f1d0bfe
add DAGContext so InterpreterDAG can exchange information with DAGDriver
windtalker Aug 8, 2019
dde6dab
merge pingcap/tics cop
windtalker Aug 8, 2019
3c29365
fix bug
windtalker Aug 8, 2019
b984cb6
1. refine code, 2. address comments
windtalker Aug 8, 2019
ddf64e6
update comments
windtalker Aug 8, 2019
d9c4a0d
columnref index is based on executor output schema
windtalker Aug 8, 2019
947606a
merge pingcap/tics cop
windtalker Aug 8, 2019
85ae8b9
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 9, 2019
aea80d6
handle error in coprocessor request
windtalker Aug 9, 2019
2ed69ef
merge pingcap/tics cop
windtalker Aug 9, 2019
353a2b1
refine code
windtalker Aug 9, 2019
f406893
use Clear to clear a protobuf message completely
windtalker Aug 9, 2019
021e4c3
refine code
windtalker Aug 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>

Expand All @@ -22,7 +24,8 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

void CoprocessorHandler::execute()
grpc::Status CoprocessorHandler::execute()
try
{
switch (cop_request->tp())
{
Expand All @@ -45,6 +48,62 @@ void CoprocessorHandler::execute()
throw Exception(
"Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const LockException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText());
cop_response->Clear();
kvrpcpb::LockInfo * lock_info = cop_response->mutable_locked();
lock_info->set_key(e.lock_infos[0]->key);
lock_info->set_primary_lock(e.lock_infos[0]->primary_lock);
lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl);
lock_info->set_lock_version(e.lock_infos[0]->lock_version);
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const RegionException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText());
cop_response->Clear();
errorpb::Error * region_err;
switch (e.status)
{
case RegionTable::RegionReadStatus::NOT_FOUND:
case RegionTable::RegionReadStatus::PENDING_REMOVE:
region_err = cop_response->mutable_region_error();
region_err->mutable_region_not_found()->set_region_id(cop_request->context().region_id());
break;
case RegionTable::RegionReadStatus::VERSION_ERROR:
region_err = cop_response->mutable_region_error();
region_err->mutable_epoch_not_match();
break;
default:
// should not happen
break;
}
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
cop_response->Clear();
cop_response->set_other_error(e.message());

if (e.code() == ErrorCodes::NOT_IMPLEMENTED)
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
cop_response->Clear();
cop_response->set_other_error(e.what());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CoprocessorHandler

~CoprocessorHandler() = default;

void execute();
grpc::Status execute();

protected:
enum
Expand Down
37 changes: 34 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>

namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
extern const int UNKNOWN_EXCEPTION;
} // namespace ErrorCodes

DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, tipb::SelectResponse & dag_response_, bool internal_)
Expand All @@ -29,6 +32,7 @@ DAGDriver::DAGDriver(Context & context_, const tipb::DAGRequest & dag_request_,
{}

void DAGDriver::execute()
try
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

Expand Down Expand Up @@ -57,8 +61,11 @@ void DAGDriver::execute()
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk,
dag_request.encode_type(), dag.getResultFieldTypes(), streams.in->getHeader());
BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response,
context.getSettings().dag_records_per_chunk,
dag_request.encode_type(),
dag.getResultFieldTypes(),
streams.in->getHeader());
copyData(*streams.in, *outputStreamPtr);
// add ExecutorExecutionSummary info
for (auto & p_streams : dag_context.profile_streams_list)
Expand All @@ -81,5 +88,29 @@ void DAGDriver::execute()
executeSummary->set_num_iterations(num_iterations);
}
}
catch (const RegionException & e)
{
e.rethrow();
}
catch (const LockException & e)
{
e.rethrow();
}
catch (const Exception & e)
{
recordError(e.code(), e.message());
}
catch (const std::exception & e)
{
recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what());
}

void DAGDriver::recordError(Int32 err_code, const String & err_msg)
{
dag_response.Clear();
tipb::Error * error = dag_response.mutable_error();
error->set_code(err_code);
error->set_msg(err_msg);
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ class DAGDriver
tipb::SelectResponse & dag_response;

bool internal;

void recordError(Int32 err_code, const String & err_msg);
};
} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,9 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
if (!current_region)
{
//todo add more region error info in RegionException
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), RegionTable::RegionReadStatus::NOT_FOUND);
}
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down
47 changes: 5 additions & 42 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <Core/Types.h>
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>

Expand Down Expand Up @@ -51,48 +49,13 @@ grpc::Status FlashService::Coprocessor(
return status;
}

try
{
CoprocessorContext cop_context(context, request->context(), *grpc_context);
CoprocessorHandler cop_handler(cop_context, request, response);
CoprocessorContext cop_context(context, request->context(), *grpc_context);
CoprocessorHandler cop_handler(cop_context, request, response);

cop_handler.execute();
auto ret = cop_handler.execute();

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
return ::grpc::Status(::grpc::StatusCode::OK, "");
}
catch (const LockException & e)
{
// TODO: handle lock error properly.
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());
}
catch (const RegionException & e)
{
// TODO: handle region error properly.
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
response->set_data("");

if (e.code() == ErrorCodes::NOT_IMPLEMENTED)
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, e.message());

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
response->set_data("");
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
return ret;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
// the index of column is constant after MergeTreeBlockInputStream is constructed. exception will be thrown if not found.
const size_t handle_column_index = 0, version_column_index = 1, delmark_column_index = 2;

const auto func_throw_retry_region = [&]() {
const auto func_throw_retry_region = [&](RegionTable::RegionReadStatus status) {
std::vector<RegionID> region_ids;
region_ids.reserve(regions_executor_data.size());
for (const auto & query_info : regions_executor_data)
region_ids.push_back(query_info.info.region_id);
throw RegionException(region_ids);
throw RegionException(std::move(region_ids), status);
};

/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
Expand Down Expand Up @@ -314,7 +314,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
if (region == nullptr)
{
LOG_WARNING(log, "[region " << query_info.info.region_id << "] is not found in KVStore, try again");
func_throw_retry_region();
func_throw_retry_region(RegionTable::RegionReadStatus::NOT_FOUND);
}
kvstore_region.emplace(query_info.info.region_id, std::move(region));
}
Expand All @@ -331,13 +331,13 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
auto start_time = Clock::now();
const size_t mem_region_num = regions_executor_data.size();
const size_t batch_size = mem_region_num / concurrent_num;
std::atomic_bool need_retry = false;
std::atomic_uint8_t region_status = RegionTable::RegionReadStatus::OK;

const auto func_run_learner_read = [&](const size_t region_begin) {
const size_t region_end = std::min(region_begin + batch_size, mem_region_num);
for (size_t region_index = region_begin; region_index < region_end; ++region_index)
{
if (need_retry)
if (region_status != RegionTable::RegionReadStatus::OK)
return;

RegionQueryInfo & region_query_info = regions_executor_data[region_index].info;
Expand All @@ -359,7 +359,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< ", handle range [" << region_query_info.range_in_table.first.toString() << ", "
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
need_retry = true;
region_status = status;
}
else if (block)
regions_executor_data[region_index].block = std::move(block);
Expand All @@ -379,8 +379,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
func_run_learner_read(0);
}

if (need_retry)
func_throw_retry_region();
if (region_status != RegionTable::RegionReadStatus::OK)
func_throw_retry_region(static_cast<RegionTable::RegionReadStatus>(region_status.load()));

auto end_time = Clock::now();
LOG_DEBUG(log,
Expand Down Expand Up @@ -862,7 +862,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(const Names & column_names_t
<< region_query_info.range_in_table.second.toString() << ") , status "
<< RegionTable::RegionReadStatusString(status));
// throw exception and exit.
func_throw_retry_region();
func_throw_retry_region(status);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/Transaction/RegionException.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/Types.h>

namespace DB
Expand All @@ -9,9 +10,12 @@ namespace DB
class RegionException : public Exception
{
public:
explicit RegionException(std::vector<RegionID> region_ids_) : region_ids(region_ids_) {}
RegionException(std::vector<RegionID> && region_ids_, RegionTable::RegionReadStatus status_)
: Exception(RegionTable::RegionReadStatusString(status_)), region_ids(region_ids_), status(status_)
{}

std::vector<RegionID> region_ids;
RegionTable::RegionReadStatus status;
};

} // namespace DB