From a9f9b488fc275c49a5b7674c65a32d0a1ddd74e8 Mon Sep 17 00:00:00 2001 From: ruoxi Date: Thu, 12 Sep 2019 13:53:02 +0800 Subject: [PATCH] FLASH-475: Support BATCH COMMANDS in flash service (#232) * Initial batch command support * Add config to control thread pool size * Address comments --- dbms/src/Flash/BatchCommandsHandler.cpp | 94 +++++++++++++++++++ dbms/src/Flash/BatchCommandsHandler.h | 50 ++++++++++ .../{Coprocessor => }/CoprocessorHandler.cpp | 17 ++-- .../{Coprocessor => }/CoprocessorHandler.h | 3 +- dbms/src/Flash/FlashService.cpp | 47 +++++++++- dbms/src/Flash/FlashService.h | 6 +- dbms/src/Interpreters/Settings.h | 1 + 7 files changed, 201 insertions(+), 17 deletions(-) create mode 100644 dbms/src/Flash/BatchCommandsHandler.cpp create mode 100644 dbms/src/Flash/BatchCommandsHandler.h rename dbms/src/Flash/{Coprocessor => }/CoprocessorHandler.cpp (88%) rename dbms/src/Flash/{Coprocessor => }/CoprocessorHandler.h (99%) diff --git a/dbms/src/Flash/BatchCommandsHandler.cpp b/dbms/src/Flash/BatchCommandsHandler.cpp new file mode 100644 index 00000000000..e6768f03c13 --- /dev/null +++ b/dbms/src/Flash/BatchCommandsHandler.cpp @@ -0,0 +1,94 @@ +#include +#include +#include + +namespace DB +{ + +BatchCommandsHandler::BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_, + tikvpb::BatchCommandsResponse & response_) + : batch_commands_context(batch_commands_context_), request(request_), response(response_), log(&Logger::get("BatchCommandsHandler")) +{} + +grpc::Status BatchCommandsHandler::execute() +{ + if (request.requests_size() == 0) + return grpc::Status::OK; + + // TODO: Fill transport_layer_load into BatchCommandsResponse. + + auto command_handler_func + = [](BatchCommandsContext::DBContextCreationFunc db_context_creation_func, grpc::ServerContext * grpc_server_context, + const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) { + if (!req.has_coprocessor()) + { + ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + return; + } + + const auto & cop_req = req.coprocessor(); + auto cop_resp = resp.mutable_coprocessor(); + + auto [context, status] = db_context_creation_func(grpc_server_context); + if (!status.ok()) + { + ret = status; + return; + } + + CoprocessorContext cop_context(context, cop_req.context(), *grpc_server_context); + CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp); + + ret = cop_handler.execute(); + }; + + /// Shortcut for only one request by not going to thread pool. + if (request.requests_size() == 1) + { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling the only batch command in place."); + + const auto & req = request.requests(0); + auto resp = response.add_responses(); + response.add_request_ids(request.request_ids(0)); + auto ret = grpc::Status::OK; + command_handler_func(batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, ret); + return ret; + } + + /// Use thread pool to handle requests concurrently. + const Settings & settings = batch_commands_context.db_context.getSettingsRef(); + size_t max_threads = settings.batch_commands_threads ? static_cast(settings.batch_commands_threads) + : static_cast(settings.max_threads); + + LOG_DEBUG( + log, __PRETTY_FUNCTION__ << ": Handling " << request.requests_size() << " batch commands using " << max_threads << " threads."); + + ThreadPool thread_pool(max_threads); + + std::vector rets; + size_t i = 0; + + for (const auto & req : request.requests()) + { + auto resp = response.add_responses(); + response.add_request_ids(request.request_ids(i++)); + rets.emplace_back(grpc::Status::OK); + thread_pool.schedule([&]() { + command_handler_func( + batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, rets.back()); + }); + } + + thread_pool.wait(); + + // Iterate all return values of each individual commands, returns the first non-OK one if any. + for (const auto & ret : rets) + { + if (!ret.ok()) + return ret; + } + + return grpc::Status::OK; +} + +} // namespace DB diff --git a/dbms/src/Flash/BatchCommandsHandler.h b/dbms/src/Flash/BatchCommandsHandler.h new file mode 100644 index 00000000000..800318be39b --- /dev/null +++ b/dbms/src/Flash/BatchCommandsHandler.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +namespace DB +{ + +struct BatchCommandsContext +{ + /// Context for this batch commands. + Context & db_context; + + /// Context creation function for each individual command - they should be handled isolated, + /// given that context is being used to pass arguments regarding queries. + using DBContextCreationFunc = std::function(grpc::ServerContext *)>; + DBContextCreationFunc db_context_creation_func; + + grpc::ServerContext & grpc_server_context; + + BatchCommandsContext( + Context & db_context_, DBContextCreationFunc && db_context_creation_func_, grpc::ServerContext & grpc_server_context_) + : db_context(db_context_), db_context_creation_func(std::move(db_context_creation_func_)), grpc_server_context(grpc_server_context_) + {} +}; + +class BatchCommandsHandler +{ +public: + BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_, + tikvpb::BatchCommandsResponse & response_); + + ~BatchCommandsHandler() = default; + + grpc::Status execute(); + +protected: + BatchCommandsContext & batch_commands_context; + const tikvpb::BatchCommandsRequest & request; + tikvpb::BatchCommandsResponse & response; + + Logger * log; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp similarity index 88% rename from dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp rename to dbms/src/Flash/CoprocessorHandler.cpp index a92f98b2945..2d72823f3af 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -1,11 +1,9 @@ -#include +#include -#include #include #include #include #include -#include #include #include #include @@ -24,8 +22,7 @@ CoprocessorHandler::CoprocessorHandler( : cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler")) {} -grpc::Status CoprocessorHandler::execute() -try +grpc::Status CoprocessorHandler::execute() try { switch (cop_request->tp()) { @@ -48,7 +45,7 @@ try throw Exception( "Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED); } - return ::grpc::Status(::grpc::StatusCode::OK, ""); + return grpc::Status::OK; } catch (const LockException & e) { @@ -60,7 +57,7 @@ catch (const LockException & e) lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl); lock_info->set_lock_version(e.lock_infos[0]->lock_version); // return ok so TiDB has the chance to see the LockException - return ::grpc::Status(::grpc::StatusCode::OK, ""); + return grpc::Status::OK; } catch (const RegionException & e) { @@ -83,7 +80,7 @@ catch (const RegionException & e) break; } // return ok so TiDB has the chance to see the LockException - return ::grpc::Status(::grpc::StatusCode::OK, ""); + return grpc::Status::OK; } catch (const Exception & e) { @@ -96,14 +93,14 @@ catch (const Exception & e) // TODO: Map other DB error codes to grpc codes. - return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message()); + return grpc::Status(grpc::StatusCode::INTERNAL, e.message()); } catch (const std::exception & e) { LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what()); cop_response->Clear(); cop_response->set_other_error(e.what()); - return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what()); + return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); } } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/CoprocessorHandler.h b/dbms/src/Flash/CoprocessorHandler.h similarity index 99% rename from dbms/src/Flash/Coprocessor/CoprocessorHandler.h rename to dbms/src/Flash/CoprocessorHandler.h index 517875e9ace..477daeeb636 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorHandler.h +++ b/dbms/src/Flash/CoprocessorHandler.h @@ -2,13 +2,12 @@ #include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" #include #include #pragma GCC diagnostic pop - -#include #include namespace DB diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 0489b6b8777..b2a473d083a 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -1,7 +1,8 @@ #include #include -#include +#include +#include #include #include @@ -54,10 +55,48 @@ grpc::Status FlashService::Coprocessor( auto ret = cop_handler.execute(); - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done"); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done: " << ret.error_code() << ", " << ret.error_message()); return ret; } +grpc::Status FlashService::BatchCommands( + grpc::ServerContext * grpc_context, grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) +{ + auto [context, status] = createDBContext(grpc_context); + if (!status.ok()) + { + return status; + } + + tikvpb::BatchCommandsRequest request; + while (stream->Read(&request)) + { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling batch commands: " << request.DebugString()); + + tikvpb::BatchCommandsResponse response; + BatchCommandsContext batch_commands_context( + context, [this](grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); }, *grpc_context); + BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response); + auto ret = batch_commands_handler.execute(); + if (!ret.ok()) + { + LOG_DEBUG( + log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message()); + return ret; + } + + if (!stream->Write(response)) + { + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Write response failed for unknown reason."); + return grpc::Status(grpc::StatusCode::UNKNOWN, "Write response failed for unknown reason."); + } + + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message()); + } + + return grpc::Status::OK; +} + String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val) { if (grpc_context->client_metadata().count(name) != 1) @@ -66,7 +105,7 @@ String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const Str return String(grpc_context->client_metadata().find(name)->second.data()); } -std::tuple FlashService::createDBContext(grpc::ServerContext * grpc_context) +std::tuple FlashService::createDBContext(grpc::ServerContext * grpc_context) { /// Create DB context. Context context = server.context(); @@ -101,7 +140,7 @@ std::tuple FlashService::createDBContext(grpc::ServerCo std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1"); context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check); - return std::make_tuple(context, ::grpc::Status::OK); + return std::make_tuple(context, grpc::Status::OK); } } // namespace DB diff --git a/dbms/src/Flash/FlashService.h b/dbms/src/Flash/FlashService.h index 8ab123cc1fb..f208ec04db0 100644 --- a/dbms/src/Flash/FlashService.h +++ b/dbms/src/Flash/FlashService.h @@ -24,7 +24,11 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar ~FlashService() final; - grpc::Status Coprocessor(grpc::ServerContext * context, const coprocessor::Request * request, coprocessor::Response * response); + grpc::Status Coprocessor( + grpc::ServerContext * grpc_context, const coprocessor::Request * request, coprocessor::Response * response) override; + + grpc::Status BatchCommands(grpc::ServerContext * grpc_context, + grpc::ServerReaderWriter * stream) override; private: std::tuple createDBContext(grpc::ServerContext * grpc_contex); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 86b42cf9ce2..6a2e619c50d 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -33,6 +33,7 @@ struct Settings M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \ M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \ M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ + M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \ M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \