Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add DAGContext so InterpreterDAG can exchange information with DAGDriver #166

Merged
merged 20 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions dbms/src/DataStreams/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <DataTypes/DataTypeNullable.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
Expand All @@ -13,11 +14,11 @@ extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_)
std::vector<tipb::FieldType> && result_field_types_, Block header_)
: dag_response(dag_response_),
records_per_chunk(records_per_chunk_),
encodeType(encodeType_),
field_tp_and_flags(field_tp_and_flags_),
result_field_types(result_field_types_),
header(header_)
{
if (encodeType == tipb::EncodeType::TypeArrow)
Expand Down Expand Up @@ -46,7 +47,7 @@ void DAGBlockOutputStream::writeSuffix()

void DAGBlockOutputStream::write(const Block & block)
{
if (block.columns() != field_tp_and_flags.size())
if (block.columns() != result_field_types.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);

// TODO: Check compatibility between field_tp_and_flags and block column types.
Expand All @@ -69,7 +70,7 @@ void DAGBlockOutputStream::write(const Block & block)
for (size_t j = 0; j < block.columns(); j++)
{
auto field = (*block.getByPosition(j).column.get())[i];
EncodeDatum(field, field_tp_and_flags[j].getCodecFlag(), current_ss);
EncodeDatum(field, getCodecFlagByFieldType(result_field_types[j]), current_ss);
}
// Encode current row
records_per_chunk++;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DAGBlockOutputStream : public IBlockOutputStream
{
public:
DAGBlockOutputStream(tipb::SelectResponse & response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_);
std::vector<tipb::FieldType> && result_field_types, Block header_);

Block getHeader() const override { return header; }
void write(const Block & block) override;
Expand All @@ -31,7 +31,7 @@ class DAGBlockOutputStream : public IBlockOutputStream

Int64 records_per_chunk;
tipb::EncodeType encodeType;
FieldTpAndFlags field_tp_and_flags;
std::vector<tipb::FieldType> result_field_types;

Block header;

Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{

class Context;

class DAGContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put an empty line between.

{
public:
DAGContext(size_t profile_list_size) { profile_streams_list.resize(profile_list_size); };
std::vector<BlockInputStreams> profile_streams_list;
};
} // namespace DB
26 changes: 24 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/DAGBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DAGQuerySource.h>
Expand Down Expand Up @@ -30,7 +31,8 @@ void DAGDriver::execute()
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

DAGQuerySource dag(context, region_id, region_version, region_conf_version, dag_request);
DAGContext dag_context(dag_request.executors_size());
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, dag_request);
BlockIO streams;

String planner = context.getSettings().dag_planner;
Expand All @@ -55,8 +57,28 @@ void DAGDriver::execute()
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk,
dag_request.encode_type(), dag.getOutputFieldTpAndFlags(), streams.in->getHeader());
dag_request.encode_type(), dag.getResultFieldTypes(), streams.in->getHeader());
copyData(*streams.in, *outputStreamPtr);
// add ExecutorExecutionSummary info
for (auto & p_streams : dag_context.profile_streams_list)
{
auto * executeSummary = dag_response.add_execution_summaries();
UInt64 time_processed_ns = 0;
UInt64 num_produced_rows = 0;
UInt64 num_iterations = 0;
for (auto & streamPtr : p_streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns += p_stream->getProfileInfo().total_stopwatch.elapsed();
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
}
executeSummary->set_time_processed_ns(time_processed_ns);
executeSummary->set_num_produced_rows(num_produced_rows);
executeSummary->set_num_iterations(num_iterations);
}
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ grpc::Status rpcTest()
col->set_val(ss.str());
value->set_tp(tipb::ExprType::Int64);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(888, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(10, ss);
value->set_val(std::string(ss.str()));

// agg: count(s) group by i;
Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace DB

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

static String genCastString(const String & org_name, const String & target_type_name)
Expand Down Expand Up @@ -151,9 +151,10 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
bool need_update_aggregated_columns = false;
NamesAndTypesList updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
auto agg_col_names = aggregated_columns.getNames();
for (Int32 i = 0; i < aggregation.agg_func_size(); i++)
{
String & name = aggregated_columns.getNames()[i];
String & name = agg_col_names[i];
String updated_name = appendCastIfNeeded(aggregation.agg_func(i), step.actions, name);
if (name != updated_name)
{
Expand All @@ -170,7 +171,7 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
for (Int32 i = 0; i < aggregation.group_by_size(); i++)
{
String & name = aggregated_columns.getNames()[i + aggregation.agg_func_size()];
String & name = agg_col_names[i + aggregation.agg_func_size()];
String updated_name = appendCastIfNeeded(aggregation.group_by(i), step.actions, name);
if (name != updated_name)
{
Expand All @@ -188,17 +189,23 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons

if (need_update_aggregated_columns)
{
auto updated_agg_col_names = updated_aggregated_columns.getNames();
auto updated_agg_col_types = updated_aggregated_columns.getTypes();
aggregated_columns.clear();
for (size_t i = 0; i < updated_aggregated_columns.size(); i++)
{
aggregated_columns.emplace_back(updated_aggregated_columns.getNames()[i], updated_aggregated_columns.getTypes()[i]);
aggregated_columns.emplace_back(updated_agg_col_names[i], updated_agg_col_types[i]);
}
}
}

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name)
String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name)
{
if (expr.has_field_type() && isFunctionExpr(expr))
if (!expr.has_field_type())
{
throw Exception("Expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
if (isFunctionExpr(expr))
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <tipb/executor.pb.h>
#pragma GCC diagnostic pop

#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/DAGUtils.h>
#include <Interpreters/ExpressionActions.h>
Expand Down Expand Up @@ -34,7 +35,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
if (chain.steps.empty())
Expand Down
84 changes: 69 additions & 15 deletions dbms/src/Interpreters/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

const String DAGQuerySource::TS_NAME("tablescan");
const String DAGQuerySource::SEL_NAME("selection");
const String DAGQuerySource::AGG_NAME("aggregation");
Expand All @@ -24,9 +29,10 @@ static void assignOrThrowException(Int32 & index, Int32 value, const String & na
index = value;
}

DAGQuerySource::DAGQuerySource(
Context & context_, RegionID region_id_, UInt64 region_version_, UInt64 region_conf_version_, const tipb::DAGRequest & dag_request_)
DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, const tipb::DAGRequest & dag_request_)
: context(context_),
dag_context(dag_context_),
region_id(region_id_),
region_version(region_version_),
region_conf_version(region_conf_version_),
Expand All @@ -48,6 +54,7 @@ DAGQuerySource::DAGQuerySource(
break;
case tipb::ExecType::TypeTopN:
assignOrThrowException(order_index, i, TOPN_NAME);
assignOrThrowException(limit_index, i, TOPN_NAME);
break;
case tipb::ExecType::TypeLimit:
assignOrThrowException(limit_index, i, LIMIT_NAME);
Expand Down Expand Up @@ -78,23 +85,70 @@ std::unique_ptr<IInterpreter> DAGQuerySource::interpreter(Context &, QueryProces
return std::make_unique<InterpreterDAG>(context, *this);
}

FieldTpAndFlags DAGQuerySource::getOutputFieldTpAndFlags() const
bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<tipb::FieldType> & output_field_types)
{
FieldTpAndFlags output;

const auto & ts = getTS();
const auto & column_infos = ts.columns();
for (auto i : dag_request.output_offsets())
tipb::FieldType field_type;
switch (executor.tp())
{
// TODO: Checking bound.
auto & column_info = column_infos[i];
output.emplace_back(FieldTpAndFlag{static_cast<TiDB::TP>(column_info.tp()), static_cast<UInt32>(column_info.flag())});
case tipb::ExecType::TypeTableScan:
for (auto ci : executor.tbl_scan().columns())
{
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
output_field_types.push_back(field_type);
}
return true;
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
for (auto & expr : executor.aggregation().agg_func())
{
if (!expr.has_field_type())
{
throw Exception("Agg expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
output_field_types.push_back(expr.field_type());
}
for (auto & expr : executor.aggregation().group_by())
{
if (!expr.has_field_type())
{
throw Exception("Group by expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
output_field_types.push_back(expr.field_type());
}
return true;
default:
return false;
}
}

// TODO: Add aggregation columns.
// We either write our own code to infer types that follows the convention between TiDB and TiKV, or ask TiDB to push down aggregation field types.

return output;
std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
{
if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output))
{
break;
}
}
if (executor_output.empty())
{
throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// tispark assumes that if there is a agg, the output offset is
// ignored and the request out put is the same as the agg's output.
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
return executor_output;
}
std::vector<tipb::FieldType> ret;
for (int i : dag_request.output_offsets())
{
ret.push_back(executor_output[i]);
}
return ret;
}

} // namespace DB
Loading