Skip to content

Commit

Permalink
FLASH-475: Support BATCH COMMANDS in flash service (#232)
Browse files Browse the repository at this point in the history
* Initial batch command support

* Add config to control thread pool size

* Address comments
  • Loading branch information
zanmato1984 authored and windtalker committed Sep 12, 2019
1 parent fce3676 commit a9f9b48
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 17 deletions.
94 changes: 94 additions & 0 deletions dbms/src/Flash/BatchCommandsHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <common/ThreadPool.h>

namespace DB
{

BatchCommandsHandler::BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_,
tikvpb::BatchCommandsResponse & response_)
: batch_commands_context(batch_commands_context_), request(request_), response(response_), log(&Logger::get("BatchCommandsHandler"))
{}

grpc::Status BatchCommandsHandler::execute()
{
if (request.requests_size() == 0)
return grpc::Status::OK;

// TODO: Fill transport_layer_load into BatchCommandsResponse.

auto command_handler_func
= [](BatchCommandsContext::DBContextCreationFunc db_context_creation_func, grpc::ServerContext * grpc_server_context,
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = db_context_creation_func(grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), *grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};

/// Shortcut for only one request by not going to thread pool.
if (request.requests_size() == 1)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling the only batch command in place.");

const auto & req = request.requests(0);
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(0));
auto ret = grpc::Status::OK;
command_handler_func(batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, ret);
return ret;
}

/// Use thread pool to handle requests concurrently.
const Settings & settings = batch_commands_context.db_context.getSettingsRef();
size_t max_threads = settings.batch_commands_threads ? static_cast<size_t>(settings.batch_commands_threads)
: static_cast<size_t>(settings.max_threads);

LOG_DEBUG(
log, __PRETTY_FUNCTION__ << ": Handling " << request.requests_size() << " batch commands using " << max_threads << " threads.");

ThreadPool thread_pool(max_threads);

std::vector<grpc::Status> rets;
size_t i = 0;

for (const auto & req : request.requests())
{
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(i++));
rets.emplace_back(grpc::Status::OK);
thread_pool.schedule([&]() {
command_handler_func(
batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, rets.back());
});
}

thread_pool.wait();

// Iterate all return values of each individual commands, returns the first non-OK one if any.
for (const auto & ret : rets)
{
if (!ret.ok())
return ret;
}

return grpc::Status::OK;
}

} // namespace DB
50 changes: 50 additions & 0 deletions dbms/src/Flash/BatchCommandsHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#include <grpcpp/server_context.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/tikvpb.pb.h>
#pragma GCC diagnostic pop

namespace DB
{

struct BatchCommandsContext
{
/// Context for this batch commands.
Context & db_context;

/// Context creation function for each individual command - they should be handled isolated,
/// given that context is being used to pass arguments regarding queries.
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(grpc::ServerContext *)>;
DBContextCreationFunc db_context_creation_func;

grpc::ServerContext & grpc_server_context;

BatchCommandsContext(
Context & db_context_, DBContextCreationFunc && db_context_creation_func_, grpc::ServerContext & grpc_server_context_)
: db_context(db_context_), db_context_creation_func(std::move(db_context_creation_func_)), grpc_server_context(grpc_server_context_)
{}
};

class BatchCommandsHandler
{
public:
BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_,
tikvpb::BatchCommandsResponse & response_);

~BatchCommandsHandler() = default;

grpc::Status execute();

protected:
BatchCommandsContext & batch_commands_context;
const tikvpb::BatchCommandsRequest & request;
tikvpb::BatchCommandsResponse & response;

Logger * log;
};

} // namespace DB
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Flash/CoprocessorHandler.h>

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
Expand All @@ -24,8 +22,7 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

grpc::Status CoprocessorHandler::execute()
try
grpc::Status CoprocessorHandler::execute() try
{
switch (cop_request->tp())
{
Expand All @@ -48,7 +45,7 @@ try
throw Exception(
"Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const LockException & e)
{
Expand All @@ -60,7 +57,7 @@ catch (const LockException & e)
lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl);
lock_info->set_lock_version(e.lock_infos[0]->lock_version);
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const RegionException & e)
{
Expand All @@ -83,7 +80,7 @@ catch (const RegionException & e)
break;
}
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const Exception & e)
{
Expand All @@ -96,14 +93,14 @@ catch (const Exception & e)

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
return grpc::Status(grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
cop_response->Clear();
cop_response->set_other_error(e.what());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
}

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

#include <common/logger_useful.h>

#include <DataStreams/BlockIO.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/coprocessor.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <grpcpp/server_context.h>

namespace DB
Expand Down
47 changes: 43 additions & 4 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include <Flash/FlashService.h>

#include <Core/Types.h>
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>

Expand Down Expand Up @@ -54,10 +55,48 @@ grpc::Status FlashService::Coprocessor(

auto ret = cop_handler.execute();

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done: " << ret.error_code() << ", " << ret.error_message());
return ret;
}

grpc::Status FlashService::BatchCommands(
grpc::ServerContext * grpc_context, grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream)
{
auto [context, status] = createDBContext(grpc_context);
if (!status.ok())
{
return status;
}

tikvpb::BatchCommandsRequest request;
while (stream->Read(&request))
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling batch commands: " << request.DebugString());

tikvpb::BatchCommandsResponse response;
BatchCommandsContext batch_commands_context(
context, [this](grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); }, *grpc_context);
BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response);
auto ret = batch_commands_handler.execute();
if (!ret.ok())
{
LOG_DEBUG(
log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message());
return ret;
}

if (!stream->Write(response))
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Write response failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write response failed for unknown reason.");
}

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message());
}

return grpc::Status::OK;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
{
if (grpc_context->client_metadata().count(name) != 1)
Expand All @@ -66,7 +105,7 @@ String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const Str
return String(grpc_context->client_metadata().find(name)->second.data());
}

std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
std::tuple<Context, grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
{
/// Create DB context.
Context context = server.context();
Expand Down Expand Up @@ -101,7 +140,7 @@ std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerCo
std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1");
context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check);

return std::make_tuple(context, ::grpc::Status::OK);
return std::make_tuple(context, grpc::Status::OK);
}

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar

~FlashService() final;

grpc::Status Coprocessor(grpc::ServerContext * context, const coprocessor::Request * request, coprocessor::Response * response);
grpc::Status Coprocessor(
grpc::ServerContext * grpc_context, const coprocessor::Request * request, coprocessor::Response * response) override;

grpc::Status BatchCommands(grpc::ServerContext * grpc_context,
grpc::ServerReaderWriter<tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) override;

private:
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct Settings
M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \
M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
Expand Down

0 comments on commit a9f9b48

Please sign in to comment.