Skip to content

Commit

Permalink
Add SMJ support (oap-project#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhejiangxiaomai committed Dec 15, 2022
1 parent ae420fb commit b0b0e62
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 44 deletions.
31 changes: 22 additions & 9 deletions velox/substrait/SubstraitParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,12 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
return std::make_shared<SubstraitType>(type);
}

std::string SubstraitParser::parseType(
const std::string& substraitType) {
auto it = typeMap_.find(substraitType);
if (it == typeMap_.end()) {
VELOX_NYI(
"Substrait parsing for type {} not supported.", substraitType);
}
return it->second;
std::string SubstraitParser::parseType(const std::string& substraitType) {
auto it = typeMap_.find(substraitType);
if (it == typeMap_.end()) {
VELOX_NYI("Substrait parsing for type {} not supported.", substraitType);
}
return it->second;
};

std::vector<std::shared_ptr<SubstraitParser::SubstraitType>>
Expand Down Expand Up @@ -286,7 +284,7 @@ void SubstraitParser::getSubFunctionTypes(
std::string delimiter = "_";
while ((pos = funcTypes.find(delimiter)) != std::string::npos) {
auto type = funcTypes.substr(0, pos);
if (type != "opt" && type !="req") {
if (type != "opt" && type != "req") {
types.emplace_back(type);
}
funcTypes.erase(0, pos + delimiter.length());
Expand Down Expand Up @@ -314,4 +312,19 @@ std::string SubstraitParser::mapToVeloxFunction(
return subFunc;
}

bool SubstraitParser::configSetInOptimization(
const ::substrait::extensions::AdvancedExtension& extension,
const std::string& config) const {
if (extension.has_optimization()) {
google::protobuf::StringValue msg;
extension.optimization().UnpackTo(&msg);
std::size_t pos = msg.value().find(config);
if ((pos != std::string::npos) &&
(msg.value().substr(pos + config.size(), 1) == "1")) {
return true;
}
}
return false;
}

} // namespace facebook::velox::substrait
11 changes: 11 additions & 0 deletions velox/substrait/SubstraitParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "velox/substrait/proto/substrait/type.pb.h"
#include "velox/substrait/proto/substrait/type_expressions.pb.h"

#include <google/protobuf/wrappers.pb.h>

namespace facebook::velox::substrait {

/// This class contains some common functions used to parse Substrait
Expand Down Expand Up @@ -94,6 +96,15 @@ class SubstraitParser {
/// Map the Substrait function keyword into Velox function keyword.
std::string mapToVeloxFunction(const std::string& substraitFunction) const;

/// @brief Return whether a config is set as true in AdvancedExtension
/// optimization.
/// @param extension Substrait advanced extension.
/// @param config the key string of a config.
/// @return Whether the config is set as true.
bool configSetInOptimization(
const ::substrait::extensions::AdvancedExtension& extension,
const std::string& config) const;

private:
/// A map used for mapping Substrait function keywords into Velox functions'
/// keywords. Key: the Substrait function keyword, Value: the Velox function
Expand Down
64 changes: 29 additions & 35 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include "velox/substrait/SubstraitToVeloxPlan.h"

#include <google/protobuf/wrappers.pb.h>

#include "velox/substrait/TypeUtils.h"
#include "velox/substrait/VariantToVectorConverter.h"
#include "velox/type/Type.h"
Expand Down Expand Up @@ -63,26 +61,6 @@ const std::string sNot = "not";
const std::string sI32 = "i32";
const std::string sI64 = "i64";

/// @brief Return whether a config is set as true in AdvancedExtension
/// optimization.
/// @param extension Substrait advanced extension.
/// @param config the key string of a config.
/// @return Whether the config is set as true.
bool configSetInOptimization(
const ::substrait::extensions::AdvancedExtension& extension,
const std::string& config) {
if (extension.has_optimization()) {
google::protobuf::StringValue msg;
extension.optimization().UnpackTo(&msg);
std::size_t pos = msg.value().find(config);
if ((pos != std::string::npos) &&
(msg.value().substr(pos + config.size(), 1) == "1")) {
return true;
}
}
return false;
}

/// @brief Get the input type from both sides of join.
/// @param leftNode the plan node of left side.
/// @param rightNode the plan node of right side.
Expand Down Expand Up @@ -219,7 +197,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_LEFT_SEMI:
// Determine the semi join type based on extracted information.
if (sJoin.has_advanced_extension() &&
configSetInOptimization(
subParser_->configSetInOptimization(
sJoin.advanced_extension(), "isExistenceJoin=")) {
joinType = core::JoinType::kLeftSemiProject;
} else {
Expand All @@ -229,7 +207,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
// Determine the semi join type based on extracted information.
if (sJoin.has_advanced_extension() &&
configSetInOptimization(
subParser_->configSetInOptimization(
sJoin.advanced_extension(), "isExistenceJoin=")) {
joinType = core::JoinType::kRightSemiProject;
} else {
Expand All @@ -239,7 +217,7 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
case ::substrait::JoinRel_JoinType::JoinRel_JoinType_JOIN_TYPE_ANTI: {
// Determine the anti join type based on extracted information.
if (sJoin.has_advanced_extension() &&
configSetInOptimization(
subParser_->configSetInOptimization(
sJoin.advanced_extension(), "isNullAwareAntiJoin=")) {
joinType = core::JoinType::kNullAwareAnti;
} else {
Expand Down Expand Up @@ -276,16 +254,32 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
exprConverter_->toVeloxExpr(sJoin.post_join_filter(), inputRowType);
}

// Create join node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinType,
leftKeys,
rightKeys,
filter,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));
if (sJoin.has_advanced_extension() &&
subParser_->configSetInOptimization(
sJoin.advanced_extension(), "isSMJ=")) {
// Create MergeJoinNode node
return std::make_shared<core::MergeJoinNode>(
nextPlanNodeId(),
joinType,
leftKeys,
rightKeys,
filter,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));

} else {
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinType,
leftKeys,
rightKeys,
filter,
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType));
}
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
Expand Down
13 changes: 13 additions & 0 deletions velox/substrait/SubstraitToVeloxPlanValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,19 @@ bool SubstraitToVeloxPlanValidator::validate(
return false;
}

if (sJoin.has_advanced_extension() &&
subParser_->configSetInOptimization(
sJoin.advanced_extension(), "isSMJ=")) {
switch (sJoin.type()) {
case ::substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
case ::substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
break;
default:
std::cout << "Sort merge join only support inner and left join"
<< std::endl;
return false;
}
}
switch (sJoin.type()) {
case ::substrait::JoinRel_JoinType_JOIN_TYPE_INNER:
case ::substrait::JoinRel_JoinType_JOIN_TYPE_OUTER:
Expand Down

0 comments on commit b0b0e62

Please sign in to comment.