Skip to content

Commit

Permalink
feat: enable ROWPARTITION and ROWOFFSET pseudo columns (KLIP-50) (#8245)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Oct 12, 2021
1 parent 7c7ac07 commit 7bdc41d
Show file tree
Hide file tree
Showing 4,665 changed files with 614,635 additions and 409 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public class KsqlConfig extends AbstractConfig {

public static final String KSQL_ROWPARTITION_ROWOFFSET_ENABLED =
"ksql.rowpartition.rowoffset.enabled";
public static final Boolean KSQL_ROWPARTITION_ROWOFFSET_DEFAULT = false;
public static final Boolean KSQL_ROWPARTITION_ROWOFFSET_DEFAULT = true;
public static final String KSQL_ROWPARTITION_ROWOFFSET_DOC =
"Feature flag for ROWPARTITION and ROWOFFSET pseudocolumns. If enabled, new queries will be"
+ "built with ROWPARTITION and ROWOFFSET pseudocolumns. If off, they will not be.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.name.ColumnName;
Expand All @@ -51,7 +52,6 @@
import org.apache.kafka.connect.data.Schema;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@SuppressWarnings({"UnstableApiUsage", "unchecked"})
Expand All @@ -66,16 +66,14 @@ public class LogicalSchemaTest {
private static final ColumnName F0 = ColumnName.of("f0");
private static final ColumnName F1 = ColumnName.of("f1");
private static final ColumnName VALUE = ColumnName.of("value");
private static final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of());

private static final LogicalSchema SOME_SCHEMA = LogicalSchema.builder()
.valueColumn(F0, STRING)
.keyColumn(K0, BIGINT)
.valueColumn(F1, BIGINT)
.build();

@Mock
private KsqlConfig ksqlConfig;

@SuppressWarnings("UnstableApiUsage")
@Test
public void shouldImplementEqualsProperly() {
Expand Down Expand Up @@ -413,6 +411,8 @@ public void shouldAddMetaAndKeyColumnsToValue() {
.valueColumn(F0, STRING)
.valueColumn(F1, BIGINT)
.valueColumn(ROWTIME_NAME, BIGINT)
.valueColumn(ROWPARTITION_NAME, INTEGER)
.valueColumn(ROWOFFSET_NAME, BIGINT)
.valueColumn(K0, INTEGER)
.valueColumn(K1, STRING)
.build()
Expand Down Expand Up @@ -468,6 +468,8 @@ public void shouldAddWindowedMetaAndKeyColumnsToValue() {
.valueColumn(F0, STRING)
.valueColumn(F1, BIGINT)
.valueColumn(ROWTIME_NAME, BIGINT)
.valueColumn(ROWPARTITION_NAME, INTEGER)
.valueColumn(ROWOFFSET_NAME, BIGINT)
.valueColumn(K0, INTEGER)
.valueColumn(K1, STRING)
.valueColumn(WINDOWSTART_NAME, BIGINT)
Expand Down Expand Up @@ -544,6 +546,8 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() {
.valueColumn(F0, BIGINT)
.valueColumn(F1, BIGINT)
.valueColumn(ROWTIME_NAME, BIGINT)
.valueColumn(ROWPARTITION_NAME, INTEGER)
.valueColumn(ROWOFFSET_NAME, BIGINT)
.valueColumn(K0, INTEGER)
.build()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.execution.expression.tree.ArithmeticBinaryExpression;
Expand All @@ -47,6 +48,7 @@
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
import io.confluent.ksql.function.FunctionRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class ExpressionEvaluatorParityTest {
private static final String STREAM_NAME = "ORDERS";
private static final long ORDER_ID = 10;
private static final long ROW_TIME = 20000;
private static final long ROWPARTITION = 5;
private static final long ROWOFFSET = 100;
private static final long ORDER_TIME = 100;
private static final String ITEM_ID = "item_id_0";
private static final long ITEM_ITEM_ID = 890;
Expand Down Expand Up @@ -109,7 +111,7 @@ public void init() {
final Map<String, Double> map = ImmutableMap.of("abc", 6.75d, "def", 9.5d);
// Note key isn't included first since it's assumed that it's provided as a value
ordersRow = GenericRow.genericRow(ORDER_ID, ITEM_ID, itemInfo, ORDER_UNITS,
doubleArray, map, null, TIMESTAMP, TIME, DATE, BYTES, ROW_TIME, ORDER_TIME);
doubleArray, map, null, TIMESTAMP, TIME, DATE, BYTES, ROW_TIME, ROWPARTITION, ROWOFFSET, ORDER_TIME);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void shouldSelectChosenColumns() {
// When:
final GenericRow transformed = selectTransformer.transform(
NON_WINDOWED_KEY,
genericRow("hi", "bye", 2.0D, "blah", "dar", 1521834663L, 1L),
genericRow("hi", "bye", 2.0D, "blah", "dar", 1521834663L, 0, 0L, 1L),
ctx
);

Expand All @@ -89,7 +89,7 @@ public void shouldApplyUdfsToColumns() {
// When:
final GenericRow row = selectTransformer.transform(
NON_WINDOWED_KEY,
genericRow("foo", "whatever", 6.9D, "boo", "hoo", 1521834663L, 2L),
genericRow("foo", "whatever", 6.9D, "boo", "hoo", 0, 0L, 1521834663L, 2L),
ctx
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void shouldCreateExecutionPlanForInsert() {
Assert.assertEquals(lines[1],
"\t\t > [ PROJECT ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE | Logger: INSERTQUERY_1.Project");
Assert.assertEquals(lines[2],
"\t\t\t\t > [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE, ROWTIME BIGINT, ROWKEY STRING | Logger: INSERTQUERY_1.KsqlTopic.Source");
"\t\t\t\t > [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE, ROWTIME BIGINT, ROWPARTITION INTEGER, ROWOFFSET BIGINT, ROWKEY STRING | Logger: INSERTQUERY_1.KsqlTopic.Source");
assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class));
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata)
queryMetadataList.get(1);
Expand Down Expand Up @@ -227,7 +227,7 @@ public void shouldCreatePlanForInsertIntoStreamFromStream() {
"> [ PROJECT ] | Schema: ROWKEY STRING KEY, COL0 INTEGER"));

assertThat(lines[2], containsString(
"> [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 INTEGER, ROWTIME BIGINT, ROWKEY STRING"));
"> [ SOURCE ] | Schema: ROWKEY STRING KEY, COL0 INTEGER, ROWTIME BIGINT, ROWPARTITION INTEGER, ROWOFFSET BIGINT, ROWKEY STRING"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public void shouldAddProjectionWithSourceAliasPrefixForJoinSources() {
selectCol("COL4", "T1_COL4"),
selectCol("COL5", "T1_COL5"),
selectCol("ROWTIME", "T1_ROWTIME"),
selectCol("ROWPARTITION", "T1_ROWPARTITION"),
selectCol("ROWOFFSET", "T1_ROWOFFSET"),
selectCol("COL0", "T1_COL0")
));
final ProjectNode right = (ProjectNode) joinNode.getSources().get(1);
Expand All @@ -188,6 +190,8 @@ public void shouldAddProjectionWithSourceAliasPrefixForJoinSources() {
selectCol("COL3", "T2_COL3"),
selectCol("COL4", "T2_COL4"),
selectCol("ROWTIME", "T2_ROWTIME"),
selectCol("ROWPARTITION", "T2_ROWPARTITION"),
selectCol("ROWOFFSET", "T2_ROWOFFSET"),
selectCol("COL0", "T2_COL0")
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void shouldUseConsistentOrderInPreAggSelectMapper() {
final ValueTransformerWithKey preAggSelectMapper = valueTransformers.get(1).get();
preAggSelectMapper.init(ctx);
final GenericRow result = (GenericRow) preAggSelectMapper
.transform(null, genericRow("1", "2", 3.0D, null, null, "rowtime", 0L));
.transform(null, genericRow("1", "2", 3.0D, null, null, "rowtime", "rowpartition", "rowoffset", 0L));
assertThat("should select col0, col1, col2, col3", result.values(),
contains(0L, "1", "2", 3.0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void before() {

givenWindowedSource(false);

when(ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)).thenReturn(true);
node = new DataSourceNode(
PLAN_NODE_ID,
SOME_SOURCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void setUp() {

selects = ImmutableList.of(new SingleColumn(COL0_REF, Optional.of(ALIAS)));

when(ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)).thenReturn(true);
projectNode = new FinalProjectNode(
NODE_ID,
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public class JoinNodeTest {

static {
final Properties props = new Properties();
props.put(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED, false);
ksqlConfig = new KsqlConfig(props);
}

Expand Down Expand Up @@ -647,10 +646,14 @@ public void shouldHaveFullyQualifiedJoinSchemaWithNonSyntheticKey() {
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_C0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_L1"), SqlTypes.STRING)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWTIME"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWPARTITION"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWOFFSET"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_leftKey"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_C0"), SqlTypes.STRING)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_R1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWTIME"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWPARTITION"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWOFFSET"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_rightKey"), SqlTypes.BIGINT)
.build()
));
Expand All @@ -671,10 +674,14 @@ public void shouldHaveFullyQualifiedJoinSchemaWithSyntheticKey() {
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_C0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_L1"), SqlTypes.STRING)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWTIME"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWPARTITION"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_ROWOFFSET"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(LEFT_ALIAS.text() + "_leftKey"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_C0"), SqlTypes.STRING)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_R1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWTIME"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWPARTITION"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_ROWOFFSET"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of(RIGHT_ALIAS.text() + "_rightKey"), SqlTypes.BIGINT)
.valueColumn(SYNTH_KEY, SqlTypes.BIGINT)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.function.FunctionRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.function.FunctionRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void setUp() {
when(aliasedDataSource.getDataSource()).thenReturn(dataSource);
when(dataSource.getKsqlTopic()).thenReturn(ksqlTopic);
when(ksqlTopic.getKeyFormat()).thenReturn(keyFormat);
when(ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)).thenReturn(true);
}

@Test
Expand All @@ -126,7 +127,8 @@ public void shouldBuildPullQueryIntermediateSchemaSelectKeyNonWindowed() {
);

// Then:
final LogicalSchema expectedSchema = INPUT_SCHEMA.withPseudoAndKeyColsInValue(false, ksqlConfig);
final LogicalSchema expectedSchema = QueryLogicalPlanUtil.buildIntermediateSchema(
INPUT_SCHEMA, true, false, true);
assertThat(expectedSchema, is(projectNode.getIntermediateSchema()));
}

Expand All @@ -151,7 +153,8 @@ public void shouldBuildPullQueryIntermediateSchemaSelectKeyWindowed() {
);

// Then:
final LogicalSchema expectedSchema = INPUT_SCHEMA.withPseudoAndKeyColsInValue(true, ksqlConfig);
final LogicalSchema expectedSchema = QueryLogicalPlanUtil.buildIntermediateSchema(
INPUT_SCHEMA, true, true, true);;
assertThat(expectedSchema, is(projectNode.getIntermediateSchema()));
}

Expand Down Expand Up @@ -310,8 +313,9 @@ public void shouldBuildPullQueryOutputSchemaSelectStar() {

// Then:
final LogicalSchema expectedSchema = INPUT_SCHEMA;
assertThat(expectedSchema.withPseudoAndKeyColsInValue(false, ksqlConfig),
is(projectNode.getIntermediateSchema()));
final LogicalSchema expectedIntermediateSchema = QueryLogicalPlanUtil.buildIntermediateSchema(
INPUT_SCHEMA, true, false, true);
assertThat(expectedIntermediateSchema, is(projectNode.getIntermediateSchema()));
assertThat(expectedSchema.withoutPseudoAndKeyColsInValue(), is(projectNode.getSchema()));
assertThrows(
IllegalStateException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class UserRepartitionNodeTest {

@Before
public void setUp() {
when(ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)).thenReturn(true);
when(parent.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(parent.getSourceName()).thenReturn(Optional.of(SOURCE_NAME));
when(parent.getSourceNodes()).thenReturn(Stream.of(sourceNode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.confluent.ksql.util.KsqlParserTestUtil;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.checkerframework.checker.units.qual.K;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -56,6 +57,8 @@ public class DefaultSchemaInjectorFunctionalTest {
private static final SqlSchemaFormatter FORMATTER =
new SqlSchemaFormatter(IdentifierUtil::needsQuotes);

private static final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of());

private static final org.apache.avro.Schema DECIMAL_SCHEMA =
parseAvroSchema(
"{"
Expand All @@ -71,6 +74,7 @@ public class DefaultSchemaInjectorFunctionalTest {
private AvroSchema avroSchema;
@Mock
private MetaStore metaStore;

private DefaultSchemaInjector schemaInjector;

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ public void shouldBuildSchemaKTableWithCorrectSchemaForFilter() {
.valueColumn(ColumnName.of("COL3"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("COL4"), SqlTypes.BOOLEAN)
.valueColumn(ColumnName.of("ROWTIME"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("ROWPARTITION"), SqlTypes.INTEGER)
.valueColumn(ColumnName.of("ROWOFFSET"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("COL0"), SqlTypes.BIGINT)
.build()
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import io.confluent.ksql.test.tools.TestCase;
import io.confluent.ksql.test.tools.TestCaseBuilder;
import io.confluent.ksql.test.tools.TopologyAndConfigs;
import io.confluent.ksql.util.KsqlConfig;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class PlannedTestUtils {
Expand All @@ -49,8 +47,7 @@ public static boolean isPlannedTestCase(final TestCase testCase) {

public static boolean isNotExcluded(final TestCase testCase) {
// Place temporary logic here to exclude test cases based on feature flags, etc.
final Map<String, Object> props = testCase.properties();
return !(boolean) props.getOrDefault(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED, false);
return true;
}

public static boolean isSamePlan(
Expand Down
Loading

0 comments on commit 7bdc41d

Please sign in to comment.