Skip to content

Commit

Permalink
fix: apply filter before flat mapping in logical planner (#3730)
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored and Tim Fox committed Nov 5, 2019
1 parent b0bbea4 commit f4bd083
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public LogicalPlanner(
public OutputNode buildPlan() {
PlanNode currentNode = buildSourceNode();

if (!analysis.getTableFunctions().isEmpty()) {
currentNode = buildFlatMapNode(currentNode);
}

if (analysis.getWhereExpression().isPresent()) {
currentNode = buildFilterNode(currentNode, analysis.getWhereExpression().get());
}

if (!analysis.getTableFunctions().isEmpty()) {
currentNode = buildFlatMapNode(currentNode);
}

if (analysis.getGroupByExpressions().isEmpty()) {
currentNode = buildProjectNode(currentNode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,26 @@
}
]
}
},
{
"name": "table function with where clause",
"statements": [
"CREATE STREAM TEST (F0 INT, F1 ARRAY<INT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT F0, EXPLODE(F1) VAL FROM TEST WHERE F0 <> 1;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"F0": 0, "F1": [1, 2, 3]}},
{"topic": "test_topic", "key": 0, "value": {"F0": 1, "F1": [4, 5, 6]}},
{"topic": "test_topic", "key": 0, "value": {"F0": 2, "F1": [7, 8, 9]}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"F0": 0, "VAL": 1}},
{"topic": "OUTPUT", "key": "0", "value": {"F0": 0, "VAL": 2}},
{"topic": "OUTPUT", "key": "0", "value": {"F0": 0, "VAL": 3}},
{"topic": "OUTPUT", "key": "0", "value": {"F0": 2, "VAL": 7}},
{"topic": "OUTPUT", "key": "0", "value": {"F0": 2, "VAL": 8}},
{"topic": "OUTPUT", "key": "0", "value": {"F0": 2, "VAL": 9}}
]
}
]
}

0 comments on commit f4bd083

Please sign in to comment.