Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'develop' into issue-1019
Browse files Browse the repository at this point in the history
  • Loading branch information
penghuo committed Jan 30, 2021
2 parents 345bc2c + 1dbf244 commit 8daa4a2
Show file tree
Hide file tree
Showing 17 changed files with 320 additions and 101 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/sql-cli-release-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ jobs:
python setup.py sdist bdist_wheel
artifact=`ls ./dist/*.tar.gz`
wheel_artifact=`ls ./dist/*.whl`
renamed_wheel_artifact=`echo $wheel_artifact | sed 's/_/-/g'`
mv "$wheel_artifact" "$renamed_wheel_artifact"
aws s3 cp $artifact s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-cli/
aws s3 cp $wheel_artifact s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-cli/
aws s3 cp $renamed_wheel_artifact s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-cli/
# aws cloudfront create-invalidation --distribution-id ${{ secrets.DISTRIBUTION_ID }} --paths "/downloads/*"

Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/sql-odbc-release-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ env:
ODBC_BIN_PATH: "./build/odbc/bin"
ODBC_BUILD_PATH: "./build/odbc/build"
AWS_SDK_INSTALL_PATH: "./build/aws-sdk/install"
PLUGIN_NAME: opendistro-sql-odbc
OD_VERSION: 1.12.0.0

jobs:
build-mac:
Expand Down Expand Up @@ -80,6 +82,8 @@ jobs:
run: |
cd installer
mac_installer=`ls -1t *.pkg | grep "Open Distro for Elasticsearch SQL ODBC Driver" | head -1`
mv "$mac_installer" "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-macos-x64.pkg"
mac_installer=`ls -1t *.pkg | grep "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-macos-x64.pkg" | head -1`
echo $mac_installer
aws s3 cp "$mac_installer" s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-odbc/mac/
build-windows32:
Expand Down Expand Up @@ -128,6 +132,8 @@ jobs:
run: |
cd ci-output/installer
windows_installer=`ls -1t *.msi | grep "Open Distro for Elasticsearch SQL ODBC Driver" | head -1`
mv "$windows_installer" "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-windows-x86.msi"
windows_installer=`ls -1t *.msi | grep "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-windows-x86.msi" | head -1`
echo $windows_installer
aws s3 cp "$windows_installer" s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-odbc/windows/
build-windows64:
Expand Down Expand Up @@ -176,5 +182,7 @@ jobs:
run: |
cd ci-output/installer
windows_installer=`ls -1t *.msi | grep "Open Distro for Elasticsearch SQL ODBC Driver" | head -1`
mv "$windows_installer" "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-windows-x64.msi"
windows_installer=`ls -1t *.msi | grep "${{ env.PLUGIN_NAME }}-${{ env.OD_VERSION }}-windows-x64.msi" | head -1`
echo $windows_installer
aws s3 cp "$windows_installer" s3://artifacts.opendistroforelasticsearch.amazon.com/downloads/elasticsearch-clients/opendistro-sql-odbc/windows/
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public String toString() {
}

/**
* Resolve the ExprValue from {@link ExprTupleValue} using paths.*
* Resolve the ExprValue from {@link ExprTupleValue} using paths.
* Considering the following sample data.
* {
* "name": "bob smith"
Expand All @@ -81,16 +81,23 @@ public String toString() {
* }
* "address": {
* "state": "WA",
* "city": "seattle"
* "city": "seattle",
* "project.year": 1990
* }
* "address.local": {
* "state": "WA",
* }
* }
* The paths could be
* 1. top level, e.g. "name", which will be resolved as "bob smith"
* 2. multiple paths, e.g. "name.address.state", which will be resolved as "WA"
* 3. special case, the "." is the path separator, but it is possible that the path include
* ".", for handling this use case, we define the resolve rule as bellow, e.g. "project.year" is
* resolved as 1990 instead of 2020.
* Resolved Rule
* resolved as 1990 instead of 2020. Note. This logic only applied top level none object field.
* e.g. "address.local.state" been resolved to Missing. but "address.project.year" could been
* resolved as 1990.
*
* <p>Resolve Rule
* 1. Resolve the full name by combine the paths("x"."y"."z") as whole ("x.y.z").
* 2. Resolve the path recursively through ExprValue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.STRING;
import static com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionDSL.define;
import static com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionDSL.impl;
import static com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionDSL.nullMissingHandling;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprIntervalValue;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
Expand Down Expand Up @@ -54,8 +55,8 @@ public void register(BuiltinFunctionRepository repository) {

private FunctionResolver interval() {
return define(BuiltinFunctionName.INTERVAL.getName(),
impl(IntervalClause::interval, INTERVAL, INTEGER, STRING),
impl(IntervalClause::interval, INTERVAL, LONG, STRING));
impl(nullMissingHandling(IntervalClause::interval), INTERVAL, INTEGER, STRING),
impl(nullMissingHandling(IntervalClause::interval), INTERVAL, LONG, STRING));
}

private ExprValue interval(ExprValue value, ExprValue unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ public void not_exist_path() {
assertTrue(actualValue.isMissing());
}

@Test
public void object_field_contain_dot() {
ReferenceExpression expr = new ReferenceExpression("address.local.state", STRING);
ExprValue actualValue = expr.resolve(tuple());

assertTrue(actualValue.isMissing());
}

@Test
public void innner_none_object_field_contain_dot() {
ReferenceExpression expr = new ReferenceExpression("address.project.year", INTEGER);
ExprValue actualValue = expr.resolve(tuple());

assertEquals(INTEGER, actualValue.type());
assertEquals(1990, actualValue.integerValue());
}

/**
* {
* "name": "bob smith"
Expand All @@ -128,19 +145,28 @@ public void not_exist_path() {
* "address": {
* "state": "WA",
* "city": "seattle"
* "project.year": 1990
* },
* "address.local": {
* "state": "WA",
* }
* }
*/
private ExprTupleValue tuple() {
ExprValue address =
ExprValueUtils.tupleValue(ImmutableMap.of("state", "WA", "city", "seattle"));
ExprValueUtils.tupleValue(ImmutableMap.of("state", "WA", "city", "seattle", "project"
+ ".year", 1990));
ExprValue project =
ExprValueUtils.tupleValue(ImmutableMap.of("year", 2020));
ExprValue addressLocal =
ExprValueUtils.tupleValue(ImmutableMap.of("state", "WA"));
ExprTupleValue tuple = ExprTupleValue.fromExprValueMap(ImmutableMap.of(
"name", new ExprStringValue("bob smith"),
"project.year", new ExprIntegerValue(1990),
"project", project,
"address", address));
"address", address,
"address.local", addressLocal
));
return tuple;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package com.amazon.opendistroforelasticsearch.sql.expression.datetime;

import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.intervalValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.missingValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.nullValue;
import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.INTEGER;
import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.INTERVAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.exception.ExpressionEvaluationException;
Expand All @@ -39,6 +43,12 @@ public class IntervalClauseTest extends ExpressionTestBase {
@Mock
Environment<Expression, ExprValue> env;

@Mock
Expression nullRef;

@Mock
Expression missingRef;

@Test
public void microsecond() {
FunctionExpression expr = dsl.interval(DSL.literal(1), DSL.literal("microsecond"));
Expand Down Expand Up @@ -114,4 +124,22 @@ public void to_string() {
FunctionExpression expr = dsl.interval(DSL.literal(1), DSL.literal("day"));
assertEquals("interval(1, \"day\")", expr.toString());
}

@Test
public void null_value() {
when(nullRef.type()).thenReturn(INTEGER);
when(nullRef.valueOf(env)).thenReturn(nullValue());
FunctionExpression expr = dsl.interval(nullRef, DSL.literal("day"));
assertEquals(INTERVAL, expr.type());
assertEquals(nullValue(), expr.valueOf(env));
}

@Test
public void missing_value() {
when(missingRef.type()).thenReturn(INTEGER);
when(missingRef.valueOf(env)).thenReturn(missingValue());
FunctionExpression expr = dsl.interval(missingRef, DSL.literal("day"));
assertEquals(INTERVAL, expr.type());
assertEquals(missingValue(), expr.valueOf(env));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PhysicalPlan visitRename(RenameOperator node, Object context) {
*/
@Override
public PhysicalPlan visitTableScan(TableScanOperator node, Object context) {
return new ResourceMonitorPlan(node, resourceMonitor);
return doProtect(node);
}

@Override
Expand Down Expand Up @@ -111,10 +111,14 @@ public PhysicalPlan visitHead(HeadOperator node, Object context) {
);
}

/**
* Decorate input node with {@link ResourceMonitorPlan} to avoid aggregate
* window function pre-loads too many data into memory in worst case.
*/
@Override
public PhysicalPlan visitWindow(WindowOperator node, Object context) {
return new WindowOperator(
visitInput(node.getInput(), context),
doProtect(visitInput(node.getInput(), context)),
node.getWindowFunction(),
node.getWindowDefinition());
}
Expand All @@ -124,11 +128,10 @@ public PhysicalPlan visitWindow(WindowOperator node, Object context) {
*/
@Override
public PhysicalPlan visitSort(SortOperator node, Object context) {
return new ResourceMonitorPlan(
return doProtect(
new SortOperator(
visitInput(node.getInput(), context),
node.getSortList()),
resourceMonitor);
node.getSortList()));
}

/**
Expand All @@ -155,4 +158,16 @@ PhysicalPlan visitInput(PhysicalPlan node, Object context) {
return node.accept(this, context);
}
}

private PhysicalPlan doProtect(PhysicalPlan node) {
if (isProtected(node)) {
return node;
}
return new ResourceMonitorPlan(node, resourceMonitor);
}

private boolean isProtected(PhysicalPlan node) {
return (node instanceof ResourceMonitorPlan);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
@RequiredArgsConstructor
@EqualsAndHashCode
public class ResourceMonitorPlan extends PhysicalPlan {

/**
* How many method calls to delegate's next() to perform resource check once.
*/
public static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000;

/**
* Delegated PhysicalPlan.
*/
Expand All @@ -44,6 +50,13 @@ public class ResourceMonitorPlan extends PhysicalPlan {
@ToString.Exclude
private final ResourceMonitor monitor;

/**
* Count how many calls to delegate's next() already.
*/
@EqualsAndHashCode.Exclude
private long nextCallCount = 0L;


@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return delegate.accept(visitor, context);
Expand Down Expand Up @@ -74,6 +87,10 @@ public boolean hasNext() {

@Override
public ExprValue next() {
boolean shouldCheck = (++nextCallCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0);
if (shouldCheck && !this.monitor.isHealthy()) {
throw new IllegalStateException("resource is not enough to load next row, quit.");
}
return delegate.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction;
import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankFunction;
import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor;
import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -213,6 +214,50 @@ public void testProtectSortForWindowOperator() {
windowDefinition)));
}

@Test
public void testProtectWindowOperatorInput() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
WindowDefinition windowDefinition = mock(WindowDefinition.class);

assertEquals(
window(
resourceMonitor(
values()),
avg,
windowDefinition),
executionProtector.protect(
window(
values(),
avg,
windowDefinition)));
}

@SuppressWarnings("unchecked")
@Test
public void testNotProtectWindowOperatorInputIfAlreadyProtected() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
Pair<Sort.SortOption, Expression> sortItem =
ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER));
WindowDefinition windowDefinition =
new WindowDefinition(emptyList(), ImmutableList.of(sortItem));

assertEquals(
window(
resourceMonitor(
sort(
values(emptyList()),
sortItem)),
avg,
windowDefinition),
executionProtector.protect(
window(
sort(
values(emptyList()),
sortItem),
avg,
windowDefinition)));
}

@Test
public void testWithoutProtection() {
Expression filterExpr = literal(ExprBooleanValue.of(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@ void openSuccess() {

@Test
void nextSuccess() {
monitorPlan.next();
verify(plan, times(1)).next();
when(resourceMonitor.isHealthy()).thenReturn(true);

for (int i = 1; i <= 1000; i++) {
monitorPlan.next();
}
verify(resourceMonitor, times(1)).isHealthy();
verify(plan, times(1000)).next();
}

@Test
void nextExceedResourceLimit() {
when(resourceMonitor.isHealthy()).thenReturn(false);

for (int i = 1; i < 1000; i++) {
monitorPlan.next();
}

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> monitorPlan.next());
assertEquals("resource is not enough to load next row, quit.", exception.getMessage());
}

@Test
Expand Down
Loading

0 comments on commit 8daa4a2

Please sign in to comment.