Skip to content

Commit

Permalink
support udf in (#175)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments

* support udf in

* refine code

* address comments

* address comments
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 14, 2019
1 parent c8cd3d7 commit 0492af6
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 48 deletions.
60 changes: 52 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>

#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnSet.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/FieldToDataType.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>
Expand Down Expand Up @@ -251,6 +254,32 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSet(
const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name)
{
if (prepared_sets.count(&expr))
{
return;
}
DataTypes set_element_types;
// todo support tuple in, i.e. (a,b) in ((1,2), (3,4)), currently TiDB convert tuple in into a series of or/and/eq exprs
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
set_element_types.push_back(sample_block.getByName(left_arg_name).type);

// todo if this is a single value in, then convert it to equal expr
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode));
set->createFromDAGExpr(set_element_types, expr, create_ordered_set);
prepared_sets[&expr] = std::move(set);
}

static String getUniqueName(const Block & block, const String & prefix)
{
int i = 1;
while (block.has(prefix + toString(i)))
++i;
return prefix + toString(i);
}

String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions)
{
String expr_name = getName(expr, getCurrentInputColumns());
Expand Down Expand Up @@ -288,20 +317,35 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
throw Exception("agg function is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}
const String & func_name = getFunctionName(expr);
if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn")
{
// todo support in
throw Exception(func_name + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
Names argument_names;
DataTypes argument_types;
for (auto & child : expr.children())

if (isInOrGlobalInOperator(func_name))
{
String name = getActions(child, actions);
String name = getActions(expr.children(0), actions);
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
makeExplicitSet(expr, actions->getSampleBlock(), false, name);
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeSet>();

const SetPtr & set = prepared_sets[&expr];

column.name = getUniqueName(actions->getSampleBlock(), "___set");
column.column = ColumnSet::create(1, set);
actions->add(ExpressionAction::addColumn(column));
argument_names.push_back(column.name);
argument_types.push_back(column.type);
}
else
{
for (auto & child : expr.children())
{
String name = getActions(child, actions);
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}
}

// re-construct expr_name, because expr_name generated previously is based on expr tree,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
namespace DB
{

class Set;
using SetPtr = std::shared_ptr<Set>;
using DAGPreparedSets = std::unordered_map<const tipb::Expr *, SetPtr>;

/** Transforms an expression from DAG expression into a sequence of actions to execute it.
*
*/
Expand All @@ -24,6 +28,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypesList source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
DAGPreparedSets prepared_sets;
Settings settings;
const Context & context;
bool after_agg;
Expand All @@ -47,6 +52,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " ";
}

void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
Expand Down
61 changes: 33 additions & 28 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
extern const int UNSUPPORTED_METHOD;
} // namespace ErrorCodes

bool isFunctionExpr(const tipb::Expr & expr)
{
switch (expr.tp())
Expand Down Expand Up @@ -43,7 +49,7 @@ const String & getAggFunctionName(const tipb::Expr & expr)
{
if (!aggFunMap.count(expr.tp()))
{
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.");
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return aggFunMap[expr.tp()];
}
Expand All @@ -54,21 +60,21 @@ const String & getFunctionName(const tipb::Expr & expr)
{
if (!aggFunMap.count(expr.tp()))
{
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.");
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return aggFunMap[expr.tp()];
}
else
{
if (!scalarFunMap.count(expr.sig()))
{
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported.");
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return scalarFunMap[expr.sig()];
}
}

String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col)
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser)
{
std::stringstream ss;
size_t cursor = 1;
Expand All @@ -94,7 +100,7 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
columnId = DecodeInt<Int64>(cursor, expr.val());
if (columnId < 0 || columnId >= (ColumnID)input_col.size())
{
throw Exception("out of bound");
throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
return input_col.getNames()[columnId];
case tipb::ExprType::Count:
Expand All @@ -105,53 +111,50 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
case tipb::ExprType::First:
if (!aggFunMap.count(expr.tp()))
{
throw Exception("not supported");
throw Exception(tipb::ExprType_Name(expr.tp()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
func_name = aggFunMap.find(expr.tp())->second;
break;
case tipb::ExprType::ScalarFunc:
if (!scalarFunMap.count(expr.sig()))
{
throw Exception("not supported");
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
func_name = scalarFunMap.find(expr.sig())->second;
break;
default:
throw Exception("not supported");
throw Exception(tipb::ExprType_Name(expr.tp()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
// build function expr
if (func_name == "in")
if (isInOrGlobalInOperator(func_name) && for_parser)
{
// for in, we could not represent the function expr using func_name(param1, param2, ...)
throw Exception("not supported");
throw Exception("Function " + func_name + " not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
else
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
{
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
String s = exprToString(child, input_col, for_parser);
if (first)
{
String s = exprToString(child, input_col);
if (first)
{
first = false;
}
else
{
ss << ", ";
}
ss << s;
first = false;
}
ss << ") ";
return ss.str();
else
{
ss << ", ";
}
ss << s;
}
ss << ") ";
return ss.str();
}

const String & getTypeName(const tipb::Expr & expr) { return tipb::ExprType_Name(expr.tp()); }

String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns)
{
return exprToString(expr, current_input_columns);
return exprToString(expr, current_input_columns, false);
}

bool isAggFunctionExpr(const tipb::Expr & expr)
Expand Down Expand Up @@ -225,7 +228,7 @@ Field decodeLiteral(const tipb::Expr & expr)
case tipb::ExprType::MysqlTime:
case tipb::ExprType::MysqlJson:
case tipb::ExprType::ValueList:
throw Exception("mysql type literal is not supported yet");
throw Exception(tipb::ExprType_Name(expr.tp()) + "is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
default:
return DecodeDatum(cursor, expr.val());
}
Expand All @@ -237,6 +240,8 @@ ColumnID getColumnID(const tipb::Expr & expr)
return DecodeInt<Int64>(cursor, expr.val());
}

bool isInOrGlobalInOperator(const String & name) { return name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn"; }

std::unordered_map<tipb::ExprType, String> aggFunMap({
{tipb::ExprType::Count, "count"}, {tipb::ExprType::Sum, "sum"}, {tipb::ExprType::Avg, "avg"}, {tipb::ExprType::Min, "min"},
{tipb::ExprType::Max, "max"}, {tipb::ExprType::First, "any"},
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser = true);
bool isInOrGlobalInOperator(const String & name);
extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;

Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(source_columns, context);

if (!dag.hasAggregation())
{
// if the dag request does not contain agg, then the final output is
Expand Down Expand Up @@ -175,28 +177,27 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
{
AnalysisResult res;
ExpressionActionsChain chain;
DAGExpressionAnalyzer analyzer(source_columns, context);
if (dag.hasSelection())
{
analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name);
analyzer->appendWhere(chain, dag.getSelection(), res.filter_column_name);
res.has_where = true;
res.before_where = chain.getLastActions();
chain.addStep();
}
// There will be either Agg...
if (dag.hasAggregation())
{
analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
analyzer->appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.need_aggregate = true;
res.before_aggregation = chain.getLastActions();

chain.finalize();
chain.clear();

// add cast if type is not match
analyzer.appendAggSelect(chain, dag.getAggregation());
analyzer->appendAggSelect(chain, dag.getAggregation());
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer.getCurrentInputColumns())
for (auto element : analyzer->getCurrentInputColumns())
{
final_project.emplace_back(element.name, "");
}
Expand All @@ -205,10 +206,10 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
if (dag.hasTopN())
{
res.has_order_by = true;
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
analyzer->appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// Append final project results if needed.
analyzer.appendFinalProject(chain, final_project);
analyzer->appendFinalProject(chain, final_project);
res.before_order_and_select = chain.getLastActions();
chain.finalize();
chain.clear();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
Expand Down Expand Up @@ -98,6 +99,8 @@ class InterpreterDAG : public IInterpreter
TMTStoragePtr storage;
TableStructureReadLockPtr table_lock;

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

Poco::Logger * log;
};
} // namespace DB
Loading

0 comments on commit 0492af6

Please sign in to comment.