Skip to content

Commit

Permalink
throw exception when meet error duing cop request handling (#162)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 7, 2019
1 parent 3870d93 commit 7cb9e71
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 71 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ namespace ErrorCodes
extern const int REGION_MISS = 10002;
extern const int SCHEMA_SYNC_ERROR = 10003;
extern const int SCHEMA_VERSION_ERROR = 10004;
extern const int COP_BAD_DAG_REQUEST = 10005;
}

}
24 changes: 13 additions & 11 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

static String genCastString(const String & org_name, const String & target_type_name)
{
return "cast(" + org_name + ", " + target_type_name + ") ";
Expand Down Expand Up @@ -44,13 +50,13 @@ DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_co
after_agg = false;
}

bool DAGExpressionAnalyzer::appendAggregation(
void DAGExpressionAnalyzer::appendAggregation(
ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregation_keys, AggregateDescriptions & aggregate_descriptions)
{
if (agg.group_by_size() == 0 && agg.agg_func_size() == 0)
{
//should not reach here
return false;
throw Exception("Aggregation executor without group by/agg exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
initChain(chain, getCurrentInputColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
Expand Down Expand Up @@ -94,14 +100,13 @@ bool DAGExpressionAnalyzer::appendAggregation(
aggregation_keys.push_back(name);
}
after_agg = true;
return true;
}

bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
void DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
{
if (sel.conditions_size() == 0)
{
return false;
throw Exception("Selection executor without condition exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
tipb::Expr final_condition;
if (sel.conditions_size() > 1)
Expand All @@ -120,14 +125,13 @@ bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const ti
initChain(chain, getCurrentInputColumns());
filter_column_name = getActions(filter, chain.steps.back().actions);
chain.steps.back().required_output.push_back(filter_column_name);
return true;
}

bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
{
if (topN.order_by_size() == 0)
{
return false;
throw Exception("TopN executor without order by exprs", ErrorCodes::COP_BAD_DAG_REQUEST);
}
initChain(chain, getCurrentInputColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
Expand All @@ -137,12 +141,11 @@ bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const
step.required_output.push_back(name);
order_column_names.push_back(name);
}
return true;
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }

bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
Expand Down Expand Up @@ -191,7 +194,6 @@ bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
aggregated_columns.emplace_back(updated_aggregated_columns.getNames()[i], updated_aggregated_columns.getTypes()[i]);
}
}
return true;
}

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class DAGExpressionAnalyzer : private boost::noncopyable

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
bool appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
void appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
bool appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
Expand Down
50 changes: 23 additions & 27 deletions dbms/src/Interpreters/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,57 @@
namespace DB
{

bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
namespace ErrorCodes
{
TableID id;
extern const int UNKNOWN_TABLE;
extern const int COP_BAD_DAG_REQUEST;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
{
TableID table_id;
if (ts.has_table_id())
{
id = ts.table_id();
table_id = ts.table_id();
}
else
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
auto storage = tmt_ctx.getStorages().get(table_id);
if (storage == nullptr)
{
return false;
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree)
{
return false;
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (ts.columns_size() == 0)
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
columns_from_ts = storage->getColumns().getAllPhysical();
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
if (cid <= 0 || cid > (ColumnID)columns_from_ts.size())
{
throw Exception("column id out of bound");
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
return true;
}

bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
{
bool first = true;
for (const tipb::Expr & expr : sel.conditions())
Expand All @@ -72,40 +78,33 @@ bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string
}
ss << s << " ";
}
return true;
}

bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss)
{
ss << "LIMIT " << limit.limit() << " ";
return true;
}
void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; }

//todo return the error message
bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
{
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
return buildTSString(executor.tbl_scan(), ss);
case tipb::ExecType::TypeIndexScan:
// index scan not supported
return false;
throw Exception("IndexScan is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeSelection:
return buildSelString(executor.selection(), ss);
case tipb::ExecType::TypeAggregation:
// stream agg is not supported, treated as normal agg
case tipb::ExecType::TypeStreamAgg:
//todo support agg
return false;
throw Exception("Aggregation is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeTopN:
// todo support top n
return false;
throw Exception("TopN is not supported", ErrorCodes::NOT_IMPLEMENTED);
case tipb::ExecType::TypeLimit:
return buildLimitString(executor.limit(), ss);
}

return false;
}

bool isProject(const tipb::Executor &)
Expand All @@ -125,10 +124,7 @@ String DAGStringConverter::buildSqlString()
std::stringstream project;
for (const tipb::Executor & executor : dag_request.executors())
{
if (!buildString(executor, query_buf))
{
return "";
}
buildString(executor, query_buf);
}
if (!isProject(dag_request.executors(dag_request.executors_size() - 1)))
{
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/DAGStringConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class DAGStringConverter
}

protected:
bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
bool buildSelString(const tipb::Selection & sel, std::stringstream & ss);
bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
bool buildString(const tipb::Executor & executor, std::stringstream & ss);
void buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
void buildSelString(const tipb::Selection & sel, std::stringstream & ss);
void buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
void buildString(const tipb::Executor & executor, std::stringstream & ss);

protected:
Context & context;
Expand Down
44 changes: 21 additions & 23 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>
Expand All @@ -30,19 +31,20 @@ extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS;
extern const int SCHEMA_VERSION_ERROR;
extern const int UNKNOWN_EXCEPTION;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
{}

// the flow is the same as executeFetchcolumns
bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
if (!ts.has_table_id())
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
Expand All @@ -67,15 +69,15 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (cid < 1 || cid > (Int64)storage->getTableInfo().columns.size())
{
// cid out of bound
return false;
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = storage->getTableInfo().columns[cid - 1].name;
required_columns.push_back(name);
}
if (required_columns.empty())
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (!dag.hasAggregation())
Expand All @@ -87,7 +89,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (i >= required_columns.size())
{
// array index out of bound
return false;
throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// do not have alias
final_project.emplace_back(required_columns[i], "");
Expand Down Expand Up @@ -125,7 +127,10 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
if (!current_region)
{
return false;
//todo add more region error info in RegionException
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
}
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down Expand Up @@ -164,7 +169,6 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
source_columns = storage->getColumns().getAllPhysical();
return true;
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand All @@ -175,17 +179,15 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
DAGExpressionAnalyzer analyzer(source_columns, context);
if (dag.hasSelection())
{
if (analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name))
{
res.has_where = true;
res.before_where = chain.getLastActions();
res.filter_column_name = chain.steps.back().required_output[0];
chain.addStep();
}
analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name);
res.has_where = true;
res.before_where = chain.getLastActions();
res.filter_column_name = chain.steps.back().required_output[0];
chain.addStep();
}
if (res.need_aggregate)
{
res.need_aggregate = analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.before_aggregation = chain.getLastActions();

chain.finalize();
Expand All @@ -201,7 +203,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
}
if (dag.hasTopN())
{
res.has_order_by = analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
res.has_order_by = true;
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// append final project results
for (auto & name : final_project)
Expand Down Expand Up @@ -423,13 +426,9 @@ void InterpreterDAG::executeOrder(Pipeline & pipeline, Strings & order_column_na
limit, settings.max_bytes_before_external_sort, context.getTemporaryPath());
}

//todo return the error message
bool InterpreterDAG::executeImpl(Pipeline & pipeline)
void InterpreterDAG::executeImpl(Pipeline & pipeline)
{
if (!executeTS(dag.getTS(), pipeline))
{
return false;
}
executeTS(dag.getTS(), pipeline);

auto res = analyzeExpressions();
// execute selection
Expand Down Expand Up @@ -458,7 +457,6 @@ bool InterpreterDAG::executeImpl(Pipeline & pipeline)
{
executeLimit(pipeline);
}
return true;
}

void InterpreterDAG::executeFinalProject(Pipeline & pipeline)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class InterpreterDAG : public IInterpreter
AggregateDescriptions aggregate_descriptions;
};

bool executeImpl(Pipeline & pipeline);
bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeImpl(Pipeline & pipeline);
void executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr);
void executeOrder(Pipeline & pipeline, Strings & order_column_names);
Expand Down

0 comments on commit 7cb9e71

Please sign in to comment.