Skip to content

Commit

Permalink
Add table write operator and builder (#1094) (#1171)
Browse files Browse the repository at this point in the history
* Add table write operator and builder

Signed-off-by: Chen Dai <[email protected]>

* Add UT for all new classes

Signed-off-by: Chen Dai <[email protected]>

* Rename child field

Signed-off-by: Chen Dai <[email protected]>

* Add columns field

Signed-off-by: Chen Dai <[email protected]>

* Update javadoc to prepare PR

Signed-off-by: Chen Dai <[email protected]>

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit c494d92)

Co-authored-by: Chen Dai <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and dai-chen authored Dec 22, 2022
1 parent d03c176 commit 18661b4
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Default implementor for implementing logical to physical translation. "Default" here means all
Expand Down Expand Up @@ -129,6 +130,11 @@ public PhysicalPlan visitTableScanBuilder(TableScanBuilder plan, C context) {
return plan.build();
}

@Override
public PhysicalPlan visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return plan.build(visitChild(plan, context));
}

@Override
public PhysicalPlan visitRelation(LogicalRelation node, C context) {
throw new UnsupportedOperationException("Storage engine is responsible for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
@UtilityClass
public class LogicalPlanDSL {

public static LogicalPlan write(LogicalPlan input, Table table, List<String> columns) {
return new LogicalWrite(input, table, columns);
}

public static LogicalPlan aggregation(
LogicalPlan input, List<NamedAggregator> aggregatorList, List<NamedExpression> groupByList) {
return new LogicalAggregation(input, aggregatorList, groupByList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.planner.logical;

import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* The visitor of {@link LogicalPlan}.
Expand All @@ -28,6 +29,14 @@ public R visitTableScanBuilder(TableScanBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitWrite(LogicalWrite plan, C context) {
return visitNode(plan, context);
}

public R visitTableWriteBuilder(TableWriteBuilder plan, C context) {
return visitNode(plan, context);
}

public R visitFilter(LogicalFilter plan, C context) {
return visitNode(plan, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.Collections;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

/**
* Logical operator for insert statement.
*/
@EqualsAndHashCode(callSuper = true)
@Getter
@ToString
public class LogicalWrite extends LogicalPlan {

/** Table that handles the write operation. */
private final Table table;

/** Optional column name list specified in insert statement. */
private final List<String> columns;

/**
* Construct a logical write with given child node, table and column name list.
*/
public LogicalWrite(LogicalPlan child, Table table, List<String> columns) {
super(Collections.singletonList(child));
this.table = table;
this.columns = columns;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitWrite(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;

/**
* {@link LogicalPlan} Optimizer.
Expand Down Expand Up @@ -55,7 +56,8 @@ public static LogicalPlanOptimizer create() {
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_PROJECT));
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

Expand Down Expand Up @@ -110,4 +111,14 @@ public static Property<LogicalPlan, Table> table() {
? Optional.of(((LogicalRelation) plan).getTable())
: Optional.empty());
}

/**
* Logical write with table field.
*/
public static Property<LogicalPlan, Table> writeTable() {
return Property.optionalProperty("table",
plan -> plan instanceof LogicalWrite
? Optional.of(((LogicalWrite) plan).getTable())
: Optional.empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule.write;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.writeTable;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Rule that replaces logical write operator with {@link TableWriteBuilder} for later optimization
* and transforming to physical operator.
*/
public class CreateTableWriteBuilder implements Rule<LogicalWrite> {

/** Capture the table inside matched logical relation operator. */
private final Capture<Table> capture;

/** Pattern that matches logical relation operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalWrite> pattern;

/**
* Construct create table write builder rule.
*/
public CreateTableWriteBuilder() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalWrite.class)
.with(writeTable().capturedAs(capture));
}

@Override
public LogicalPlan apply(LogicalWrite plan, Captures captures) {
return captures.get(capture).createWriteBuilder(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.planner.physical;

import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.write.TableWriteOperator;

/**
* The visitor of {@link PhysicalPlan}.
Expand Down Expand Up @@ -36,6 +37,10 @@ public R visitTableScan(TableScanOperator node, C context) {
return visitNode(node, context);
}

public R visitTableWrite(TableWriteOperator node, C context) {
return visitNode(node, context);
}

public R visitProject(ProjectOperator node, C context) {
return visitNode(node, context);
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalWrite;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

/**
* Table.
Expand Down Expand Up @@ -73,6 +75,17 @@ default TableScanBuilder createScanBuilder() {
return null; // TODO: Enforce all subclasses to implement this later
}

/*
* Create table write builder for logical to physical transformation.
*
* @param plan logical write plan
* @return table write builder
*/
default TableWriteBuilder createWriteBuilder(LogicalWrite plan) {
throw new UnsupportedOperationException(
"Write operation is not supported on current table");
}

/**
* Translate {@link Table} to {@link StreamingSource} if possible.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.write;

import java.util.Collections;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* A {@link TableWriteBuilder} represents transition state between logical planning and physical
* planning for table write operator. The concrete implementation class gets involved in the logical
* optimization through this abstraction and thus transform to specific {@link TableWriteOperator}
* in a certain storage engine.
*/
public abstract class TableWriteBuilder extends LogicalPlan {

/**
* Construct table write builder with child node.
*/
public TableWriteBuilder(LogicalPlan child) {
super(Collections.singletonList(child));
}

/**
* Build table write operator with given child node.
*
* @param child child operator node
* @return table write operator
*/
public abstract TableWriteOperator build(PhysicalPlan child);

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableWriteBuilder(this, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.storage.write;

import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;

/**
* {@link TableWriteOperator} is the abstraction for data source to implement different physical
* write operator on a data source. This is also to avoid "polluting" physical plan visitor by
* concrete table scan implementation.
*/
@RequiredArgsConstructor
public abstract class TableWriteOperator extends PhysicalPlan {

/** Input physical node. */
protected final PhysicalPlan input;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableWrite(this, context);
}

@Override
public List<PhysicalPlan> getChild() {
return Collections.singletonList(input);
}

/**
* Explain the execution plan.
*
* @return explain output
*/
public abstract String explain();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;
import org.opensearch.sql.storage.write.TableWriteOperator;

@ExtendWith(MockitoExtension.class)
class DefaultImplementorTest {
Expand Down Expand Up @@ -212,4 +214,17 @@ public TableScanOperator build() {
};
assertEquals(tableScanOperator, tableScanBuilder.accept(implementor, null));
}

@Test
public void visitTableWriteBuilderShouldBuildTableWriteOperator() {
LogicalPlan child = values();
TableWriteOperator tableWriteOperator = Mockito.mock(TableWriteOperator.class);
TableWriteBuilder logicalPlan = new TableWriteBuilder(child) {
@Override
public TableWriteOperator build(PhysicalPlan child) {
return tableWriteOperator;
}
};
assertEquals(tableWriteOperator, logicalPlan.accept(implementor, null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -31,9 +32,12 @@
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.aggregation.Aggregator;
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;
import org.opensearch.sql.storage.write.TableWriteOperator;

/**
* Todo. Temporary added for UT coverage, Will be removed.
Expand Down Expand Up @@ -83,6 +87,19 @@ public TableScanOperator build() {
assertNull(tableScanBuilder.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

LogicalPlan write = LogicalPlanDSL.write(null, table, Collections.emptyList());
assertNull(write.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

TableWriteBuilder tableWriteBuilder = new TableWriteBuilder(null) {
@Override
public TableWriteOperator build(PhysicalPlan child) {
return null;
}
};
assertNull(tableWriteBuilder.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));

LogicalPlan filter = LogicalPlanDSL.filter(relation, expression);
assertNull(filter.accept(new LogicalPlanNodeVisitor<Integer, Object>() {
}, null));
Expand Down
Loading

0 comments on commit 18661b4

Please sign in to comment.