Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenSearch index write operator and builder #1139

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Default implementor for implementing logical to physical translation. "Default" here means all
Expand Down Expand Up @@ -123,6 +125,16 @@ public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
}

@Override
public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) {
return plan.build();
}

@Override
public PhysicalPlan visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return plan.build(visitChild(plan, context));
}

@Override
public PhysicalPlan visitRelation(LogicalRelation node, C context) {
throw new UnsupportedOperationException("Storage engine is responsible for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
@UtilityClass
public class LogicalPlanDSL {

public static LogicalPlan write(LogicalPlan input, Table table, List<String> columns) {
return new LogicalWrite(input, table, columns);
}

public static LogicalPlan aggregation(
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
return new LogicalAggregation(input, aggregatorList, groupByList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

package org.opensearch.sql.planner.logical;

import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* The visitor of {@link LogicalPlan}.
*
Expand All @@ -22,6 +25,18 @@ public R visitRelation(LogicalRelation plan, C context) {
return visitNode(plan, context);
}

public R visitTableScanBuilder(TableScanBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitWrite(LogicalWrite plan, C context) {
return visitNode(plan, context);
}

public R visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitFilter(LogicalFilter plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

/**
* Logical operator for insert statement.
*/
@EqualsAndHashCode(callSuper = true)
@Getter
@ToString
public class LogicalWrite extends LogicalPlan {

/** Table that handles the write operation. */
private final Table table;

/** Optional column name list specified in insert statement. */
private final List<String> columns;

/**
* Construct a logical write with given child node, table and column name list.
*/
public LogicalWrite(LogicalPlan child, Table table, List<String> columns) {
super(Collections.singletonList(child));
this.table = table;
this.columns = columns;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitWrite(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;

/**
* {@link LogicalPlan} Optimizer.
Expand All @@ -39,8 +42,22 @@ public LogicalPlanOptimizer(List<Rule<?>> rules) {
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort()));
new PushFilterUnderSort(),
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
Expand All @@ -63,7 +80,14 @@ private LogicalPlan internalOptimize(LogicalPlan plan) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());
done = false;

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,79 @@

package org.opensearch.sql.planner.optimizer.pattern;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.Property;
import com.facebook.presto.matching.PropertyPattern;
import java.util.Optional;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Pattern helper class.
*/
@UtilityClass
public class Patterns {

/**
* Logical filter with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalFilter> filter(Pattern<T> pattern) {
return Pattern.typeOf(LogicalFilter.class).with(source(pattern));
}

/**
* Logical aggregate operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalAggregation> aggregate(Pattern<T> pattern) {
return Pattern.typeOf(LogicalAggregation.class).with(source(pattern));
}

/**
* Logical sort operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalSort> sort(Pattern<T> pattern) {
return Pattern.typeOf(LogicalSort.class).with(source(pattern));
}

/**
* Logical limit operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalLimit> limit(Pattern<T> pattern) {
return Pattern.typeOf(LogicalLimit.class).with(source(pattern));
}

/**
* Logical highlight operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalHighlight> highlight(Pattern<T> pattern) {
return Pattern.typeOf(LogicalHighlight.class).with(source(pattern));
}

/**
* Logical project operator with a given pattern on inner field.
*/
public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T> pattern) {
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

/**
* Pattern for {@link TableScanBuilder} and capture it meanwhile.
*/
public static Pattern<TableScanBuilder> scanBuilder() {
return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture());
}

/**
* LogicalPlan source {@link Property}.
*/
Expand All @@ -25,4 +87,38 @@ public static Property<LogicalPlan, LogicalPlan> source() {
? Optional.of(plan.getChild().get(0))
: Optional.empty());
}

/**
* Source (children field) with a given pattern.
*/
@SuppressWarnings("unchecked")
public static <T extends LogicalPlan>
PropertyPattern<LogicalPlan, T> source(Pattern<T> pattern) {
Property<LogicalPlan, T> property = Property.optionalProperty("source",
plan -> plan.getChild().size() == 1
? Optional.of((T) plan.getChild().get(0))
: Optional.empty());

return property.matching(pattern);
}

/**
* Logical relation with table field.
*/
public static Property<LogicalPlan, Table> table() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalRelation
? Optional.of(((LogicalRelation) plan).getTable())
: Optional.empty());
}

/**
* Logical relation with table field.
*/
public static Property<LogicalPlan, Table> writeTable() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalWrite
? Optional.of(((LogicalWrite) plan).getTable())
: Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.read;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* Rule that replace logical relation operator to {@link TableScanBuilder} for later
* push down optimization. All push down optimization rules that depends on table scan
* builder needs to run after this.
*/
public class CreateTableScanBuilder implements Rule<LogicalRelation> {

/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalRelation> pattern;

/**
* Construct create table scan builder rule.
*/
public CreateTableScanBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalRelation.class)
.with(table().capturedAs(capture));
}

@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture).createScanBuilder();
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
}
}
Loading