Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pushdown optimization and logical to physical transformation #1091

Merged
merged 13 commits into from
Dec 7, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Default implementor for implementing logical to physical translation. "Default" here means all
Expand Down Expand Up @@ -123,6 +124,11 @@ public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
}

@Override
public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) {
return plan.build();
}

@Override
public PhysicalPlan visitRelation(LogicalRelation node, C context) {
throw new UnsupportedOperationException("Storage engine is responsible for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.opensearch.sql.planner.logical;

import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* The visitor of {@link LogicalPlan}.
*
Expand All @@ -22,6 +24,10 @@ public R visitRelation(LogicalRelation plan, C context) {
return visitNode(plan, context);
}

public R visitTableScanBuilder(TableScanBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitFilter(LogicalFilter plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;

/**
* {@link LogicalPlan} Optimizer.
Expand All @@ -39,8 +41,21 @@ public LogicalPlanOptimizer(List<Rule<?>> rules) {
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort()));
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT));
}

/**
Expand All @@ -63,7 +78,14 @@ private LogicalPlan internalOptimize(LogicalPlan plan) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());
done = false;

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,78 @@

package org.opensearch.sql.planner.optimizer.pattern;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.Property;
import com.facebook.presto.matching.PropertyPattern;
import java.util.Optional;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Pattern helper class.
*/
@UtilityClass
public class Patterns {

/**
* Logical filter with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalFilter> filter(Pattern<T> pattern) {
return Pattern.typeOf(LogicalFilter.class).with(source(pattern));
}

/**
* Logical aggregate operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalAggregation> aggregate(Pattern<T> pattern) {
return Pattern.typeOf(LogicalAggregation.class).with(source(pattern));
}

/**
* Logical sort operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalSort> sort(Pattern<T> pattern) {
return Pattern.typeOf(LogicalSort.class).with(source(pattern));
}

/**
* Logical limit operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalLimit> limit(Pattern<T> pattern) {
return Pattern.typeOf(LogicalLimit.class).with(source(pattern));
}

/**
* Logical highlight operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalHighlight> highlight(Pattern<T> pattern) {
return Pattern.typeOf(LogicalHighlight.class).with(source(pattern));
}

/**
* Logical project operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T> pattern) {
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

/**
* Pattern for {@link TableScanBuilder} and capture it meanwhile.
*/
public static Pattern<TableScanBuilder> scanBuilder() {
return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture());
}

/**
* LogicalPlan source {@link Property}.
*/
Expand All @@ -25,4 +86,28 @@ public static Property<LogicalPlan, LogicalPlan> source() {
? Optional.of(plan.getChild().get(0))
: Optional.empty());
}

/**
* Source (children field) with a given pattern.
*/
@SuppressWarnings("unchecked")
public static <T extends LogicalPlan>
PropertyPattern<LogicalPlan, T> source(Pattern<T> pattern) {
Property<LogicalPlan, T> property = Property.optionalProperty("source",
plan -> plan.getChild().size() == 1
? Optional.of((T) plan.getChild().get(0))
: Optional.empty());

return property.matching(pattern);
}

/**
* Logical relation with table field.
*/
public static Property<LogicalPlan, Table> table() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalRelation
? Optional.of(((LogicalRelation) plan).getTable())
: Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.read;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule that replace logical relation operator to {@link TableScanBuilder} for later
* push down optimization. All push down optimization rules that depends on table scan
* builder needs to run after this.
*/
public class CreateTableScanBuilder implements Rule<LogicalRelation> {

/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;

/**
* Construct create table scan builder rule.
*/
public CreateTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
}

@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture).createScanBuilder();
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.read;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.aggregate;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.filter;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.highlight;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.project;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.scanBuilder;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.sort;
import static org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown.TableScanPushDownBuilder.match;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.pattern.CapturePattern;
import com.facebook.presto.matching.pattern.WithPattern;
import java.util.function.BiFunction;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule template for all table scan push down rules. Because all push down optimization rules
* have similar workflow in common, such as a pattern that match an operator on top of table scan
* builder, and action that eliminates the original operator if pushed down, this class helps
* remove redundant code and improve readability.
*
* @param <T> logical plan node type
*/
public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {

/** Push down optimize rule for filtering condition. */
public static final Rule<?> PUSH_DOWN_FILTER =
match(
filter(
scanBuilder()))
.apply((filter, scanBuilder) -> scanBuilder.pushDownFilter(filter));

/** Push down optimize rule for aggregate operator. */
public static final Rule<?> PUSH_DOWN_AGGREGATION =
match(
aggregate(
scanBuilder()))
.apply((agg, scanBuilder) -> scanBuilder.pushDownAggregation(agg));

/** Push down optimize rule for sort operator. */
public static final Rule<?> PUSH_DOWN_SORT =
match(
sort(
scanBuilder()))
.apply((sort, scanBuilder) -> scanBuilder.pushDownSort(sort));

/** Push down optimize rule for limit operator. */
public static final Rule<?> PUSH_DOWN_LIMIT =
match(
limit(
scanBuilder()))
.apply((limit, scanBuilder) -> scanBuilder.pushDownLimit(limit));

public static final Rule<?> PUSH_DOWN_PROJECT =
match(
project(
scanBuilder()))
.apply((project, scanBuilder) -> scanBuilder.pushDownProject(project));

public static final Rule<?> PUSH_DOWN_HIGHLIGHT =
match(
highlight(
scanBuilder()))
.apply((highlight, scanBuilder) -> scanBuilder.pushDownHighlight(highlight));


/** Pattern that matches a plan node. */
private final WithPattern<T> pattern;

/** Capture table scan builder inside a plan node. */
private final Capture<TableScanBuilder> capture;

/** Push down function applied to the plan node and captured table scan builder. */
private final BiFunction<T, TableScanBuilder, Boolean> pushDownFunction;


@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@Override
public Pattern<T> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(T plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture);
if (pushDownFunction.apply(plan, scanBuilder)) {
return scanBuilder;
}
return plan;
}

/**
* Custom builder class other than generated by Lombok to provide more readable code.
*/
static class TableScanPushDownBuilder<T extends LogicalPlan> {

private WithPattern<T> pattern;

public static <T extends LogicalPlan>
TableScanPushDownBuilder<T> match(Pattern<T> pattern) {
TableScanPushDownBuilder<T> builder = new TableScanPushDownBuilder<>();
builder.pattern = (WithPattern<T>) pattern;
return builder;
}

public TableScanPushDown<T> apply(
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
return new TableScanPushDown<>(pattern, pushDownFunction);
}
}
}
Loading