diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 920bc66257d..a5f373defa4 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -392,6 +392,7 @@ namespace ErrorCodes extern const int REGION_MISS = 10002; extern const int SCHEMA_SYNC_ERROR = 10003; extern const int SCHEMA_VERSION_ERROR = 10004; + extern const int COP_BAD_DAG_REQUEST = 10005; } } diff --git a/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp index ea824eb2a4d..7ebf23309a4 100644 --- a/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp @@ -11,6 +11,12 @@ namespace DB { + +namespace ErrorCodes +{ + extern const int COP_BAD_DAG_REQUEST; +} // namespace ErrorCodes + static String genCastString(const String & org_name, const String & target_type_name) { return "cast(" + org_name + ", " + target_type_name + ") "; @@ -44,13 +50,13 @@ DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_co after_agg = false; } -bool DAGExpressionAnalyzer::appendAggregation( +void DAGExpressionAnalyzer::appendAggregation( ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregation_keys, AggregateDescriptions & aggregate_descriptions) { if (agg.group_by_size() == 0 && agg.agg_func_size() == 0) { //should not reach here - return false; + throw Exception("Aggregation executor without group by/agg exprs", ErrorCodes::COP_BAD_DAG_REQUEST); } initChain(chain, getCurrentInputColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); @@ -94,14 +100,13 @@ bool DAGExpressionAnalyzer::appendAggregation( aggregation_keys.push_back(name); } after_agg = true; - return true; } -bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name) +void DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name) { if (sel.conditions_size() == 0) { - return false; + throw Exception("Selection executor without condition exprs", ErrorCodes::COP_BAD_DAG_REQUEST); } tipb::Expr final_condition; if (sel.conditions_size() > 1) @@ -120,14 +125,13 @@ bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const ti initChain(chain, getCurrentInputColumns()); filter_column_name = getActions(filter, chain.steps.back().actions); chain.steps.back().required_output.push_back(filter_column_name); - return true; } -bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names) +void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names) { if (topN.order_by_size() == 0) { - return false; + throw Exception("TopN executor without order by exprs", ErrorCodes::COP_BAD_DAG_REQUEST); } initChain(chain, getCurrentInputColumns()); ExpressionActionsChain::Step & step = chain.steps.back(); @@ -137,12 +141,11 @@ bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const step.required_output.push_back(name); order_column_names.push_back(name); } - return true; } const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; } -bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation) +void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation) { initChain(chain, getCurrentInputColumns()); bool need_update_aggregated_columns = false; @@ -191,7 +194,6 @@ bool DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons aggregated_columns.emplace_back(updated_aggregated_columns.getNames()[i], updated_aggregated_columns.getTypes()[i]); } } - return true; } String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name) diff --git a/dbms/src/Interpreters/DAGExpressionAnalyzer.h b/dbms/src/Interpreters/DAGExpressionAnalyzer.h index 960bdcd4bd4..c3fd9eca460 100644 --- a/dbms/src/Interpreters/DAGExpressionAnalyzer.h +++ b/dbms/src/Interpreters/DAGExpressionAnalyzer.h @@ -29,11 +29,11 @@ class DAGExpressionAnalyzer : private boost::noncopyable public: DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_); - bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name); - bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names); - bool appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys, + void appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name); + void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names); + void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys, AggregateDescriptions & aggregate_descriptions); - bool appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg); + void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg); String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name); void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const { diff --git a/dbms/src/Interpreters/DAGStringConverter.cpp b/dbms/src/Interpreters/DAGStringConverter.cpp index a66eeb927a6..3cdc1b97692 100644 --- a/dbms/src/Interpreters/DAGStringConverter.cpp +++ b/dbms/src/Interpreters/DAGStringConverter.cpp @@ -11,34 +11,41 @@ namespace DB { -bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) +namespace ErrorCodes { - TableID id; +extern const int UNKNOWN_TABLE; +extern const int COP_BAD_DAG_REQUEST; +extern const int NOT_IMPLEMENTED; +} // namespace ErrorCodes + +void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) +{ + TableID table_id; if (ts.has_table_id()) { - id = ts.table_id(); + table_id = ts.table_id(); } else { // do not have table id - return false; + throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); } auto & tmt_ctx = context.getTMTContext(); - auto storage = tmt_ctx.getStorages().get(id); + auto storage = tmt_ctx.getStorages().get(table_id); if (storage == nullptr) { - return false; + throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); } const auto * merge_tree = dynamic_cast(storage.get()); if (!merge_tree) { - return false; + throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST); } if (ts.columns_size() == 0) { // no column selected, must be something wrong - return false; + throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); } columns_from_ts = storage->getColumns().getAllPhysical(); for (const tipb::ColumnInfo & ci : ts.columns()) @@ -46,16 +53,15 @@ bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst ColumnID cid = ci.column_id(); if (cid <= 0 || cid > (ColumnID)columns_from_ts.size()) { - throw Exception("column id out of bound"); + throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); } String name = merge_tree->getTableInfo().columns[cid - 1].name; output_from_ts.push_back(std::move(name)); } ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " "; - return true; } -bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) +void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) { bool first = true; for (const tipb::Expr & expr : sel.conditions()) @@ -72,17 +78,12 @@ bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string } ss << s << " "; } - return true; } -bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) -{ - ss << "LIMIT " << limit.limit() << " "; - return true; -} +void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; } //todo return the error message -bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) +void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) { switch (executor.tp()) { @@ -90,22 +91,20 @@ bool DAGStringConverter::buildString(const tipb::Executor & executor, std::strin return buildTSString(executor.tbl_scan(), ss); case tipb::ExecType::TypeIndexScan: // index scan not supported - return false; + throw Exception("IndexScan is not supported", ErrorCodes::NOT_IMPLEMENTED); case tipb::ExecType::TypeSelection: return buildSelString(executor.selection(), ss); case tipb::ExecType::TypeAggregation: // stream agg is not supported, treated as normal agg case tipb::ExecType::TypeStreamAgg: //todo support agg - return false; + throw Exception("Aggregation is not supported", ErrorCodes::NOT_IMPLEMENTED); case tipb::ExecType::TypeTopN: // todo support top n - return false; + throw Exception("TopN is not supported", ErrorCodes::NOT_IMPLEMENTED); case tipb::ExecType::TypeLimit: return buildLimitString(executor.limit(), ss); } - - return false; } bool isProject(const tipb::Executor &) @@ -125,10 +124,7 @@ String DAGStringConverter::buildSqlString() std::stringstream project; for (const tipb::Executor & executor : dag_request.executors()) { - if (!buildString(executor, query_buf)) - { - return ""; - } + buildString(executor, query_buf); } if (!isProject(dag_request.executors(dag_request.executors_size() - 1))) { diff --git a/dbms/src/Interpreters/DAGStringConverter.h b/dbms/src/Interpreters/DAGStringConverter.h index fc8006f8096..fa91a72c7b7 100644 --- a/dbms/src/Interpreters/DAGStringConverter.h +++ b/dbms/src/Interpreters/DAGStringConverter.h @@ -41,10 +41,10 @@ class DAGStringConverter } protected: - bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss); - bool buildSelString(const tipb::Selection & sel, std::stringstream & ss); - bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss); - bool buildString(const tipb::Executor & executor, std::stringstream & ss); + void buildTSString(const tipb::TableScan & ts, std::stringstream & ss); + void buildSelString(const tipb::Selection & sel, std::stringstream & ss); + void buildLimitString(const tipb::Limit & limit, std::stringstream & ss); + void buildString(const tipb::Executor & executor, std::stringstream & ss); protected: Context & context; diff --git a/dbms/src/Interpreters/InterpreterDAG.cpp b/dbms/src/Interpreters/InterpreterDAG.cpp index 23f6b24e04d..52226c03de6 100644 --- a/dbms/src/Interpreters/InterpreterDAG.cpp +++ b/dbms/src/Interpreters/InterpreterDAG.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ extern const int UNKNOWN_TABLE; extern const int TOO_MANY_COLUMNS; extern const int SCHEMA_VERSION_ERROR; extern const int UNKNOWN_EXCEPTION; +extern const int COP_BAD_DAG_REQUEST; } // namespace ErrorCodes InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) @@ -37,12 +39,12 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) {} // the flow is the same as executeFetchcolumns -bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) +void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) { if (!ts.has_table_id()) { // do not have table id - return false; + throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); } TableID table_id = ts.table_id(); // TODO: Get schema version from DAG request. @@ -67,7 +69,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) if (cid < 1 || cid > (Int64)storage->getTableInfo().columns.size()) { // cid out of bound - return false; + throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); } String name = storage->getTableInfo().columns[cid - 1].name; required_columns.push_back(name); @@ -75,7 +77,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) if (required_columns.empty()) { // no column selected, must be something wrong - return false; + throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST); } if (!dag.hasAggregation()) @@ -87,7 +89,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) if (i >= required_columns.size()) { // array index out of bound - return false; + throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); } // do not have alias final_project.emplace_back(required_columns[i], ""); @@ -125,7 +127,10 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id); if (!current_region) { - return false; + //todo add more region error info in RegionException + std::vector region_ids; + region_ids.push_back(info.region_id); + throw RegionException(region_ids); } info.range_in_table = current_region->getHandleRangeByTable(table_id); query_info.mvcc_query_info->regions_query_info.push_back(info); @@ -164,7 +169,6 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) } ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName(); source_columns = storage->getColumns().getAllPhysical(); - return true; } InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() @@ -175,17 +179,15 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() DAGExpressionAnalyzer analyzer(source_columns, context); if (dag.hasSelection()) { - if (analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name)) - { - res.has_where = true; - res.before_where = chain.getLastActions(); - res.filter_column_name = chain.steps.back().required_output[0]; - chain.addStep(); - } + analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name); + res.has_where = true; + res.before_where = chain.getLastActions(); + res.filter_column_name = chain.steps.back().required_output[0]; + chain.addStep(); } if (res.need_aggregate) { - res.need_aggregate = analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions); + analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions); res.before_aggregation = chain.getLastActions(); chain.finalize(); @@ -201,7 +203,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() } if (dag.hasTopN()) { - res.has_order_by = analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names); + res.has_order_by = true; + analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names); } // append final project results for (auto & name : final_project) @@ -423,13 +426,9 @@ void InterpreterDAG::executeOrder(Pipeline & pipeline, Strings & order_column_na limit, settings.max_bytes_before_external_sort, context.getTemporaryPath()); } -//todo return the error message -bool InterpreterDAG::executeImpl(Pipeline & pipeline) +void InterpreterDAG::executeImpl(Pipeline & pipeline) { - if (!executeTS(dag.getTS(), pipeline)) - { - return false; - } + executeTS(dag.getTS(), pipeline); auto res = analyzeExpressions(); // execute selection @@ -458,7 +457,6 @@ bool InterpreterDAG::executeImpl(Pipeline & pipeline) { executeLimit(pipeline); } - return true; } void InterpreterDAG::executeFinalProject(Pipeline & pipeline) diff --git a/dbms/src/Interpreters/InterpreterDAG.h b/dbms/src/Interpreters/InterpreterDAG.h index 4ffc0b0067f..28c9a784108 100644 --- a/dbms/src/Interpreters/InterpreterDAG.h +++ b/dbms/src/Interpreters/InterpreterDAG.h @@ -68,8 +68,8 @@ class InterpreterDAG : public IInterpreter AggregateDescriptions aggregate_descriptions; }; - bool executeImpl(Pipeline & pipeline); - bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline); + void executeImpl(Pipeline & pipeline); + void executeTS(const tipb::TableScan & ts, Pipeline & pipeline); void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); void executeOrder(Pipeline & pipeline, Strings & order_column_names);