Skip to content

Commit

Permalink
chore: pass pseudocolumn feature flag as boolean rather than config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Oct 11, 2021
1 parent 3c97060 commit 60c1bb2
Show file tree
Hide file tree
Showing 34 changed files with 251 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,29 @@ public LogicalSchema withPseudoAndKeyColsInValue(
) {
return withPseudoAndKeyColsInValue(
windowed,
ksqlConfig,
ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED),
false
);
}

/**
* Copies pseudo and key columns to the value schema with the current pseudocolumn version number
*
* <p>Similar to the above implementation, but determines the version to use by using the
* provided boolean
*
* @param windowed indicates that the source is windowed
* @param rowpartitionRowoffsetEnabled whether KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED
* is enabled
* @return the new schema.
*/
public LogicalSchema withPseudoAndKeyColsInValue(
final boolean windowed,
final boolean rowpartitionRowoffsetEnabled
) {
return withPseudoAndKeyColsInValue(
windowed,
rowpartitionRowoffsetEnabled,
false
);
}
Expand All @@ -186,18 +208,19 @@ public LogicalSchema withPseudoAndKeyColsInValue(
* config and whether or not the calling context is a pull or scalable push query.
*
* @param windowed indicates that the source is windowed
* @param ksqlConfig the config to utilize for finding the version number
* @param rowpartitionRowoffsetEnabled whether KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED
* is enabled
* @param forPullOrScalablePushQuery whether this is a pull or scalable push query schema
* @return the new schema.
*/
public LogicalSchema withPseudoAndKeyColsInValue(
final boolean windowed,
final KsqlConfig ksqlConfig,
final boolean rowpartitionRowoffsetEnabled,
final boolean forPullOrScalablePushQuery
) {
return rebuildWithPseudoAndKeyColsInValue(
windowed,
SystemColumns.getPseudoColumnVersionFromConfig(ksqlConfig),
SystemColumns.getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled),
forPullOrScalablePushQuery
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public static boolean isPseudoColumn(
return isPseudoColumn(columnName, getPseudoColumnVersionFromConfig(ksqlConfig));
}

public static boolean isPseudoColumn(
final ColumnName columnName,
final boolean rowpartitionRowoffsetEnabled
) {
return isPseudoColumn(columnName,
getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled));
}

public static Set<ColumnName> pseudoColumnNames(final int pseudoColumnVersion) {

validatePseudoColumnVersion(pseudoColumnVersion);
Expand All @@ -135,6 +143,10 @@ public static Set<ColumnName> pseudoColumnNames(final KsqlConfig ksqlConfig) {
return pseudoColumnNames(getPseudoColumnVersionFromConfig(ksqlConfig));
}

public static Set<ColumnName> pseudoColumnNames(final boolean rowpartitionRowoffsetEnabled) {
return pseudoColumnNames(getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled));
}

public static boolean isSystemColumn(final ColumnName columnName, final int pseudoColumnVersion) {
return systemColumnNames(pseudoColumnVersion).contains(columnName);
}
Expand Down Expand Up @@ -190,16 +202,22 @@ public static boolean isDisallowedInPullOrScalablePushQueries(
.anyMatch(col -> col.name.equals(columnName));
}

public static int getPseudoColumnVersionFromConfig(final boolean rowpartitionRowoffsetEnabled) {
return getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled, false);
}

public static int getPseudoColumnVersionFromConfig(final KsqlConfig ksqlConfig) {
return getPseudoColumnVersionFromConfig(ksqlConfig, false);
return getPseudoColumnVersionFromConfig(
ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED),
false
);
}

public static int getPseudoColumnVersionFromConfig(
final KsqlConfig ksqlConfig,
final boolean rowpartitionRowoffsetEnabled,
final boolean forPullOrScalablePushQuery
) {
return ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)
&& !forPullOrScalablePushQuery
return rowpartitionRowoffsetEnabled && !forPullOrScalablePushQuery
? CURRENT_PSEUDOCOLUMN_VERSION_NUMBER
: LEGACY_PSEUDOCOLUMN_VERSION_NUMBER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
Expand All @@ -43,7 +42,6 @@
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -60,7 +58,7 @@
public class Analysis implements ImmutableAnalysis {

private final Optional<RefinementInfo> refinementInfo;
private final BiFunction<Map<SourceName, LogicalSchema>, KsqlConfig, SourceSchemas>
private final BiFunction<Map<SourceName, LogicalSchema>, Boolean, SourceSchemas>
sourceSchemasFactory;
private Optional<Into> into = Optional.empty();
private final List<AliasedDataSource> allDataSources = new ArrayList<>();
Expand All @@ -76,22 +74,25 @@ public class Analysis implements ImmutableAnalysis {
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();
private final List<FunctionCall> tableFunctions = new ArrayList<>();
private boolean orReplace = false;
private final KsqlConfig ksqlConfig;
private final boolean rowpartitionRowoffsetEnabled;

public Analysis(final Optional<RefinementInfo> refinementInfo, final KsqlConfig ksqlConfig) {
this(refinementInfo, SourceSchemas::new, ksqlConfig);
public Analysis(
final Optional<RefinementInfo> refinementInfo,
final boolean rowpartitionRowoffsetEnabled
) {
this(refinementInfo, SourceSchemas::new, rowpartitionRowoffsetEnabled);
}

@VisibleForTesting
Analysis(
final Optional<RefinementInfo> refinementInfo,
final BiFunction<Map<SourceName, LogicalSchema>, KsqlConfig, SourceSchemas>
final BiFunction<Map<SourceName, LogicalSchema>, Boolean, SourceSchemas>
sourceSchemasFactory,
final KsqlConfig ksqlConfig
final boolean rowpartitionRowoffsetEnabled
) {
this.refinementInfo = requireNonNull(refinementInfo, "refinementInfo");
this.sourceSchemasFactory = requireNonNull(sourceSchemasFactory, "sourceSchemasFactory");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.rowpartitionRowoffsetEnabled = rowpartitionRowoffsetEnabled;
}

void addSelectItem(final SelectItem selectItem) {
Expand Down Expand Up @@ -205,7 +206,7 @@ public SourceSchemas getFromSourceSchemas(final boolean postAggregate) {
ads -> buildStreamsSchema(ads, postAggregate)
));

return sourceSchemasFactory.apply(schemaBySource, ksqlConfig);
return sourceSchemasFactory.apply(schemaBySource, rowpartitionRowoffsetEnabled);
}

Optional<AliasedDataSource> getSourceByAlias(final SourceName name) {
Expand Down Expand Up @@ -268,10 +269,8 @@ public List<FunctionCall> getTableFunctions() {
return Collections.unmodifiableList(tableFunctions);
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP",
justification = "KsqlConfig is Effectively Immutable")
public KsqlConfig getKsqlConfig() {
return ksqlConfig;
public boolean getRowpartitionRowoffsetEnabled() {
return rowpartitionRowoffsetEnabled;
}

private LogicalSchema buildStreamsSchema(
Expand All @@ -289,7 +288,9 @@ private LogicalSchema buildStreamsSchema(

return ds.getDataSource()
.getSchema()
.withPseudoAndKeyColsInValue(windowedSource || windowedGroupBy, ksqlConfig);
.withPseudoAndKeyColsInValue(
windowedSource || windowedGroupBy,
rowpartitionRowoffsetEnabled);
}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.serde.kafka.KafkaFormat;
import io.confluent.ksql.serde.none.NoneFormat;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.UnknownSourceException;
import java.util.HashMap;
Expand Down Expand Up @@ -98,21 +97,22 @@ class Analyzer {

private final MetaStore metaStore;
private final String topicPrefix;
private final KsqlConfig ksqlConfig;
private final boolean rowpartitionRowoffsetEnabled;

/**
* @param metaStore the metastore to use.
* @param topicPrefix the prefix to use for topic names where an explicit name is not specified.
* @param ksqlConfig the config with which to identify the correct pseudocolumn version to use.
* @param rowpartitionRowoffsetEnabled whether KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED
* is true
*/
Analyzer(
final MetaStore metaStore,
final String topicPrefix,
final KsqlConfig ksqlConfig
final boolean rowpartitionRowoffsetEnabled
) {
this.metaStore = requireNonNull(metaStore, "metaStore");
this.topicPrefix = requireNonNull(topicPrefix, "topicPrefix");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.rowpartitionRowoffsetEnabled = rowpartitionRowoffsetEnabled;
}

/**
Expand Down Expand Up @@ -147,7 +147,7 @@ private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
private boolean isGroupBy = false;

Visitor(final Query query, final boolean persistent) {
this.analysis = new Analysis(query.getRefinement(), ksqlConfig);
this.analysis = new Analysis(query.getRefinement(), rowpartitionRowoffsetEnabled);
this.persistent = persistent;
}

Expand Down Expand Up @@ -595,7 +595,8 @@ private void analyzeHaving(final Expression node) {

private void validateSelect(final SingleColumn column) {

final int pseudoColumnVersion = SystemColumns.getPseudoColumnVersionFromConfig(ksqlConfig);
final int pseudoColumnVersion = SystemColumns
.getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled);

SystemColumns.systemColumnNames(pseudoColumnVersion)
.forEach(col -> checkForReservedToken(column, col));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.ksql.name.Name;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -103,7 +102,7 @@ private static Optional<String> disallowedColumnNameInSelectClause(final Analysi
.map(ColumnExtractor::extractColumns)
.flatMap(Collection::stream)
.map(ColumnReferenceExp::getColumnName)
.filter(name -> disallowedInPullQueries(name, analysis.getKsqlConfig()))
.filter(name -> disallowedInPullQueries(name, analysis.getRowpartitionRowoffsetEnabled()))
.map(Name::toString)
.collect(Collectors.joining(", "));

Expand All @@ -126,7 +125,7 @@ private static Optional<String> disallowedColumnNameInWhereClause(final Analysis
final String disallowedColumns = ColumnExtractor.extractColumns(expression.get())
.stream()
.map(ColumnReferenceExp::getColumnName)
.filter(name -> disallowedInPullQueries(name, analysis.getKsqlConfig()))
.filter(name -> disallowedInPullQueries(name, analysis.getRowpartitionRowoffsetEnabled()))
.map(Name::toString)
.collect(Collectors.joining(", "));

Expand All @@ -141,9 +140,10 @@ private static Optional<String> disallowedColumnNameInWhereClause(final Analysis

private static boolean disallowedInPullQueries(
final ColumnName columnName,
final KsqlConfig ksqlConfig
final boolean rowpartitionRowoffsetEnabled
) {
final int pseudoColumnVersion = SystemColumns.getPseudoColumnVersionFromConfig(ksqlConfig);
final int pseudoColumnVersion = SystemColumns
.getPseudoColumnVersionFromConfig(rowpartitionRowoffsetEnabled);
return SystemColumns.isDisallowedInPullOrScalablePushQueries(columnName, pseudoColumnVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Optional;

Expand All @@ -36,10 +35,10 @@ public class QueryAnalyzer {
public QueryAnalyzer(
final MetaStore metaStore,
final String outputTopicPrefix,
final KsqlConfig ksqlConfig
final boolean rowpartitionRowoffsetEnabled
) {
this(
new Analyzer(metaStore, outputTopicPrefix, ksqlConfig),
new Analyzer(metaStore, outputTopicPrefix, rowpartitionRowoffsetEnabled),
new PullQueryValidator(),
new PushQueryValidator()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collection;
import java.util.stream.Collectors;
Expand All @@ -41,7 +40,7 @@ private QueryValidatorUtil() {
*/
static void validateNoUserColumnsWithSameNameAsPseudoColumns(final Analysis analysis) {

final KsqlConfig ksqlConfig = analysis.getKsqlConfig();
final boolean rowpartitionRowoffsetEnabled = analysis.getRowpartitionRowoffsetEnabled();

final String disallowedNames = analysis.getAllDataSources()
.stream()
Expand All @@ -50,7 +49,7 @@ static void validateNoUserColumnsWithSameNameAsPseudoColumns(final Analysis anal
.map(LogicalSchema::value)
.flatMap(Collection::stream)
.map(Column::name)
.filter(name -> SystemColumns.isPseudoColumn(name, ksqlConfig))
.filter(name -> SystemColumns.isPseudoColumn(name, rowpartitionRowoffsetEnabled))
.map(ColumnName::toString)
.collect(Collectors.joining(", "));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand All @@ -36,13 +35,13 @@
public final class SourceSchemas {

private final ImmutableMap<SourceName, LogicalSchema> sourceSchemas;
private final KsqlConfig ksqlConfig;
private final boolean rowpartitionRowoffsetEnabled;

SourceSchemas(
final Map<SourceName, LogicalSchema> sourceSchemas,
final KsqlConfig ksqlConfig) {
final boolean rowpartitionRowoffsetEnabled) {
this.sourceSchemas = ImmutableMap.copyOf(requireNonNull(sourceSchemas, "sourceSchemas"));
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.rowpartitionRowoffsetEnabled = rowpartitionRowoffsetEnabled;

// This will fail
if (sourceSchemas.isEmpty()) {
Expand Down Expand Up @@ -101,7 +100,8 @@ boolean matchesNonValueField(final Optional<SourceName> source, final ColumnName
if (!source.isPresent()) {
return sourceSchemas.values().stream()
.anyMatch(schema ->
SystemColumns.isPseudoColumn(column, ksqlConfig) || schema.isKeyColumn(column));
SystemColumns.isPseudoColumn(column, rowpartitionRowoffsetEnabled)
|| schema.isKeyColumn(column));
}

final SourceName sourceName = source.get();
Expand All @@ -110,6 +110,7 @@ boolean matchesNonValueField(final Optional<SourceName> source, final ColumnName
throw new IllegalArgumentException("Unknown source: " + sourceName);
}

return sourceSchema.isKeyColumn(column) || SystemColumns.isPseudoColumn(column, ksqlConfig);
return sourceSchema.isKeyColumn(column)
|| SystemColumns.isPseudoColumn(column, rowpartitionRowoffsetEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ PreparedStatement<?> prepare(final ParsedStatement stmt, final Map<String, Strin
preparedStatement.getStatement(),
metaStore,
ksqlConfig.getBoolean(KsqlConfig.KSQL_LAMBDAS_ENABLED),
ksqlConfig
ksqlConfig.getBoolean(KsqlConfig.KSQL_ROWPARTITION_ROWOFFSET_ENABLED)
));
} catch (final KsqlStatementException e) {
throw e;
Expand Down
Loading

0 comments on commit 60c1bb2

Please sign in to comment.