Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throw exception when meet error duing cop request handling #162

Merged
merged 15 commits into from
Aug 7, 2019
Merged
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ namespace ErrorCodes
extern const int REGION_MISS = 10002;
extern const int SCHEMA_SYNC_ERROR = 10003;
extern const int SCHEMA_VERSION_ERROR = 10004;
extern const int COP_BAD_DAG_REQUEST = 10005;
}

}
47 changes: 22 additions & 25 deletions dbms/src/Interpreters/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,56 @@
namespace DB
{

bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
namespace ErrorCodes
{
TableID id;
extern const int UNKNOWN_TABLE;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss)
{
TableID table_id;
if (ts.has_table_id())
{
id = ts.table_id();
table_id = ts.table_id();
}
else
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
auto & tmt_ctx = context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
auto storage = tmt_ctx.getStorages().get(table_id);
if (storage == nullptr)
{
return false;
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree)
{
return false;
throw Exception("Only MergeTree table is supported in DAG request", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (ts.columns_size() == 0)
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
columns_from_ts = storage->getColumns().getAllPhysical();
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
if (cid <= 0 || cid > (ColumnID)columns_from_ts.size())
{
throw Exception("column id out of bound");
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
return true;
}

bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
{
bool first = true;
for (const tipb::Expr & expr : sel.conditions())
Expand All @@ -72,35 +77,30 @@ bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::string
}
ss << s << " ";
}
return true;
}

bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss)
{
ss << "LIMIT " << limit.limit() << " ";
return true;
}
void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { ss << "LIMIT " << limit.limit() << " "; }

//todo return the error message
bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss)
{
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
return buildTSString(executor.tbl_scan(), ss);
case tipb::ExecType::TypeIndexScan:
// index scan not supported
return false;
throw Exception("IndexScan is not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error code?

case tipb::ExecType::TypeSelection:
return buildSelString(executor.selection(), ss);
case tipb::ExecType::TypeAggregation:
// stream agg is not supported, treated as normal agg
case tipb::ExecType::TypeStreamAgg:
//todo support agg
return false;
throw Exception("Aggregation is not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

case tipb::ExecType::TypeTopN:
// todo support top n
return false;
throw Exception("TopN is not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

case tipb::ExecType::TypeLimit:
return buildLimitString(executor.limit(), ss);
}
Expand All @@ -125,10 +125,7 @@ String DAGStringConverter::buildSqlString()
std::stringstream project;
for (const tipb::Executor & executor : dag_request.executors())
{
if (!buildString(executor, query_buf))
{
return "";
}
buildString(executor, query_buf);
}
if (!isProject(dag_request.executors(dag_request.executors_size() - 1)))
{
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/DAGStringConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class DAGStringConverter
}

protected:
bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
bool buildSelString(const tipb::Selection & sel, std::stringstream & ss);
bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
bool buildString(const tipb::Executor & executor, std::stringstream & ss);
void buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
void buildSelString(const tipb::Selection & sel, std::stringstream & ss);
void buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
void buildString(const tipb::Executor & executor, std::stringstream & ss);

protected:
Context & context;
Expand Down
27 changes: 13 additions & 14 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/RegionQueryInfo.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/Types.h>
Expand All @@ -30,19 +31,20 @@ extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS;
extern const int SCHEMA_VERSION_ERROR;
extern const int UNKNOWN_EXCEPTION;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
{}

// the flow is the same as executeFetchcolumns
bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
{
if (!ts.has_table_id())
{
// do not have table id
return false;
throw Exception("Table id not specified in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
Expand All @@ -67,15 +69,15 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (cid < 1 || cid > (Int64)storage->getTableInfo().columns.size())
{
// cid out of bound
return false;
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = storage->getTableInfo().columns[cid - 1].name;
required_columns.push_back(name);
}
if (required_columns.empty())
{
// no column selected, must be something wrong
return false;
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}

if (!dag.hasAggregation())
Expand All @@ -87,7 +89,7 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (i >= required_columns.size())
{
// array index out of bound
return false;
throw Exception("Output offset index is out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// do not have alias
final_project.emplace_back(required_columns[i], "");
Expand Down Expand Up @@ -125,7 +127,10 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
auto current_region = context.getTMTContext().getRegionTable().getRegionByTableAndID(table_id, info.region_id);
if (!current_region)
{
return false;
//todo add more region error info in RegionException
std::vector<RegionID> region_ids;
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
}
info.range_in_table = current_region->getHandleRangeByTable(table_id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
Expand Down Expand Up @@ -164,7 +169,6 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
source_columns = storage->getColumns().getAllPhysical();
return true;
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand Down Expand Up @@ -423,13 +427,9 @@ void InterpreterDAG::executeOrder(Pipeline & pipeline, Strings & order_column_na
limit, settings.max_bytes_before_external_sort, context.getTemporaryPath());
}

//todo return the error message
bool InterpreterDAG::executeImpl(Pipeline & pipeline)
void InterpreterDAG::executeImpl(Pipeline & pipeline)
{
if (!executeTS(dag.getTS(), pipeline))
{
return false;
}
executeTS(dag.getTS(), pipeline);

auto res = analyzeExpressions();
// execute selection
Expand Down Expand Up @@ -458,7 +458,6 @@ bool InterpreterDAG::executeImpl(Pipeline & pipeline)
{
executeLimit(pipeline);
}
return true;
}

void InterpreterDAG::executeFinalProject(Pipeline & pipeline)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class InterpreterDAG : public IInterpreter
AggregateDescriptions aggregate_descriptions;
};

bool executeImpl(Pipeline & pipeline);
bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeImpl(Pipeline & pipeline);
void executeTS(const tipb::TableScan & ts, Pipeline & pipeline);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr);
void executeOrder(Pipeline & pipeline, Strings & order_column_names);
Expand Down