Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-475: Support BATCH COMMANDS in flash service #232

Merged
merged 3 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions dbms/src/Flash/BatchCommandsHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <common/ThreadPool.h>

namespace DB
{

BatchCommandsHandler::BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_,
tikvpb::BatchCommandsResponse & response_)
: batch_commands_context(batch_commands_context_), request(request_), response(response_), log(&Logger::get("BatchCommandsHandler"))
{}

grpc::Status BatchCommandsHandler::execute()
{
if (request.requests_size() == 0)
return grpc::Status::OK;

auto command_handler_func
= [](BatchCommandsContext::DBContextCreationFunc db_context_creation_func, grpc::ServerContext * grpc_server_context,
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = db_context_creation_func(grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), *grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};

/// Shortcut for only one request by not going to thread pool.
if (request.requests_size() == 1)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling the only batch command in place.");

const auto & req = request.requests(0);
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(0));
auto ret = grpc::Status::OK;
command_handler_func(batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, ret);
return ret;
}

/// Use thread pool to handle requests concurrently.
const Settings & settings = batch_commands_context.db_context.getSettingsRef();
size_t max_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);

LOG_DEBUG(
log, __PRETTY_FUNCTION__ << ": Handling " << request.requests_size() << " batch commands using " << max_threads << " threads.");

ThreadPool thread_pool(max_threads);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about use a system level thread pool? As far as I know, tikv use system level thread pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, TiKV did it that way. But 1) we are two different scenarios: TP vs. AP, we tend to use as much resource as possible for a single query; 2) there is a similar case in ClickHouse own code - concurrent aggregation, I followed it, i.e. using individual thread pool and a control config default to 0 (to use max_threads); 3) there is no handy thread pool implementation that is designed to be a global one that has to deal problems such as "are all threads done for this particular job?"


std::vector<grpc::Status> rets;
size_t i = 0;

for (const auto & req : request.requests())
{
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(i++));
rets.emplace_back(grpc::Status::OK);
thread_pool.schedule([&]() {
command_handler_func(
batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, rets.back());
});
}

thread_pool.wait();

// Iterate all return values of each individual commands, returns the first non-OK one if any.
for (const auto & ret : rets)
{
if (!ret.ok())
return ret;
}

return grpc::Status::OK;
}

} // namespace DB
50 changes: 50 additions & 0 deletions dbms/src/Flash/BatchCommandsHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#include <grpcpp/server_context.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/tikvpb.pb.h>
#pragma GCC diagnostic pop

namespace DB
{

struct BatchCommandsContext
{
/// Context for this batch commands.
Context & db_context;

/// Context creation function for each individual command - they should be handled isolated,
/// given that context is being used to pass arguments regarding queries.
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(grpc::ServerContext *)>;
DBContextCreationFunc db_context_creation_func;

grpc::ServerContext & grpc_server_context;

BatchCommandsContext(
Context & db_context_, DBContextCreationFunc && db_context_creation_func_, grpc::ServerContext & grpc_server_context_)
: db_context(db_context_), db_context_creation_func(std::move(db_context_creation_func_)), grpc_server_context(grpc_server_context_)
{}
};

class BatchCommandsHandler
{
public:
BatchCommandsHandler(BatchCommandsContext & batch_commands_context_, const tikvpb::BatchCommandsRequest & request_,
tikvpb::BatchCommandsResponse & response_);

~BatchCommandsHandler() = default;

grpc::Status execute();

protected:
BatchCommandsContext & batch_commands_context;
const tikvpb::BatchCommandsRequest & request;
tikvpb::BatchCommandsResponse & response;

Logger * log;
};

} // namespace DB
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Flash/CoprocessorHandler.h>

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
Expand All @@ -24,8 +22,7 @@ CoprocessorHandler::CoprocessorHandler(
: cop_context(cop_context_), cop_request(cop_request_), cop_response(cop_response_), log(&Logger::get("CoprocessorHandler"))
{}

grpc::Status CoprocessorHandler::execute()
try
grpc::Status CoprocessorHandler::execute() try
{
switch (cop_request->tp())
{
Expand All @@ -48,7 +45,7 @@ try
throw Exception(
"Coprocessor request type " + std::to_string(cop_request->tp()) + " is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const LockException & e)
{
Expand All @@ -60,7 +57,7 @@ catch (const LockException & e)
lock_info->set_lock_ttl(e.lock_infos[0]->lock_ttl);
lock_info->set_lock_version(e.lock_infos[0]->lock_version);
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const RegionException & e)
{
Expand All @@ -83,7 +80,7 @@ catch (const RegionException & e)
break;
}
// return ok so TiDB has the chance to see the LockException
return ::grpc::Status(::grpc::StatusCode::OK, "");
return grpc::Status::OK;
}
catch (const Exception & e)
{
Expand All @@ -96,14 +93,14 @@ catch (const Exception & e)

// TODO: Map other DB error codes to grpc codes.

return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.message());
return grpc::Status(grpc::StatusCode::INTERNAL, e.message());
}
catch (const std::exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what());
cop_response->Clear();
cop_response->set_other_error(e.what());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, e.what());
return grpc::Status(grpc::StatusCode::INTERNAL, e.what());
}

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

#include <common/logger_useful.h>

#include <DataStreams/BlockIO.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/coprocessor.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <grpcpp/server_context.h>

namespace DB
Expand Down
47 changes: 43 additions & 4 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#include <Flash/FlashService.h>

#include <Core/Types.h>
#include <Flash/Coprocessor/CoprocessorHandler.h>
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server_builder.h>

Expand Down Expand Up @@ -54,10 +55,48 @@ grpc::Status FlashService::Coprocessor(

auto ret = cop_handler.execute();

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done");
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle coprocessor request done: " << ret.error_code() << ", " << ret.error_message());
return ret;
}

grpc::Status FlashService::BatchCommands(
grpc::ServerContext * grpc_context, grpc::ServerReaderWriter<::tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream)
{
auto [context, status] = createDBContext(grpc_context);
if (!status.ok())
{
return status;
}

tikvpb::BatchCommandsRequest request;
while (stream->Read(&request))
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling batch commands: " << request.DebugString());

tikvpb::BatchCommandsResponse response;
BatchCommandsContext batch_commands_context(
context, [this](grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); }, *grpc_context);
BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response);
auto ret = batch_commands_handler.execute();
if (!ret.ok())
{
LOG_DEBUG(
log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message());
return ret;
}

if (!stream->Write(response))
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Write response failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write response failed for unknown reason.");
}

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle batch commands request done: " << ret.error_code() << ", " << ret.error_message());
}

return grpc::Status::OK;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
{
if (grpc_context->client_metadata().count(name) != 1)
Expand All @@ -66,7 +105,7 @@ String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const Str
return String(grpc_context->client_metadata().find(name)->second.data());
}

std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
std::tuple<Context, grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
{
/// Create DB context.
Context context = server.context();
Expand Down Expand Up @@ -101,7 +140,7 @@ std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerCo
std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1");
context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check);

return std::make_tuple(context, ::grpc::Status::OK);
return std::make_tuple(context, grpc::Status::OK);
}

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar

~FlashService() final;

grpc::Status Coprocessor(grpc::ServerContext * context, const coprocessor::Request * request, coprocessor::Response * response);
grpc::Status Coprocessor(
grpc::ServerContext * grpc_context, const coprocessor::Request * request, coprocessor::Response * response) override;

grpc::Status BatchCommands(grpc::ServerContext * grpc_context,
grpc::ServerReaderWriter<tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) override;

private:
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct Settings
M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \
M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
Expand Down