Skip to content

Commit

Permalink
feat: enable new emit-final implementation (#9141)
Browse files Browse the repository at this point in the history
Kafka Streams added a new implementation to get only final results for windowed aggregations. This PR moves off using `suppress()` to implement `EMIT FINAL`, in favor of the new implementation of Kafka Streams. This PR also enabled `EMIT FINAL` by default now.
  • Loading branch information
mjsax authored Jul 7, 2022
1 parent 908aeda commit 2af201f
Show file tree
Hide file tree
Showing 88 changed files with 8,241 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;

/**
* Resolves Ksql and streams property name to a ConfigItem.
Expand Down Expand Up @@ -132,11 +133,21 @@ private static Optional<ConfigItem> resolveConfig(

final String keyNoPrefix = stripPrefix(propertyName, prefix);
final ConfigKey configKey = def.configKeys().get(keyNoPrefix);
if (configKey == null) {
return Optional.empty();
if (configKey != null) {
return Optional.of(ConfigItem.resolved(configKey));
}

if (isInternalStreamsConfig(keyNoPrefix)) {
return Optional.of(ConfigItem.unresolved(keyNoPrefix));
}

return Optional.of(ConfigItem.resolved(configKey));
return Optional.empty();
}

private static boolean isInternalStreamsConfig(final String key) {
// add more internal configs on demand
return key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX)
|| key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION);
}

private static String stripPrefix(final String maybePrefixedKey, final String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.errorprone.annotations.Immutable;
import java.util.Optional;

Expand All @@ -28,6 +29,7 @@ protected Node(final Optional<NodeLocation> location) {
this.location = requireNonNull(location, "location");
}

@JsonIgnore
public Optional<NodeLocation> getLocation() {
return location;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum OutputRefinement {
*
* <p>For a stream, all events are final, so all are output.
*/
FINAL;
FINAL
}


Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.OutputRefinement;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -31,17 +32,26 @@ public final class WindowInfo {

private final WindowType type;
private final Optional<Duration> size;
private final OutputRefinement emitStrategy;

@JsonCreator
public static WindowInfo of(
@JsonProperty(value = "type", required = true) final WindowType type,
@JsonProperty(value = "size") final Optional<Duration> size) {
return new WindowInfo(type, size);
@JsonProperty(value = "size") final Optional<Duration> size,
@JsonProperty(value = "emitStrategy") final Optional<OutputRefinement> emitStrategy) {
return new WindowInfo(type, size, emitStrategy);
}

private WindowInfo(final WindowType type, final Optional<Duration> size) {
private WindowInfo(
final WindowType type,
final Optional<Duration> size,
final Optional<OutputRefinement> emitStrategy
) {
this.type = Objects.requireNonNull(type, "type");
this.size = Objects.requireNonNull(size, "size");
this.emitStrategy = Objects.requireNonNull(
emitStrategy.orElse(OutputRefinement.CHANGES),
"emitStrategy");

if (type.requiresWindowSize() && !size.isPresent()) {
throw new IllegalArgumentException("Size required");
Expand All @@ -64,6 +74,10 @@ public Optional<Duration> getSize() {
return size;
}

public OutputRefinement getEmitStrategy() {
return emitStrategy;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -73,12 +87,17 @@ public boolean equals(final Object o) {
return false;
}
final WindowInfo that = (WindowInfo) o;

// we omit `emitStrategy` because `WindowInfo` is used to determine the topic format,
// and the emit-strategy has no impact on the serialization format
return type == that.type
&& Objects.equals(size, that.size);
}

@Override
public int hashCode() {
// we omit `emitStrategy` because `WindowInfo` is used to determine the topic format,
// and the emit-strategy has no impact on the serialization format
return Objects.hash(type, size);
}

Expand All @@ -87,6 +106,7 @@ public String toString() {
return "WindowInfo{"
+ "type=" + type
+ ", size=" + size.map(Duration::toMillis)
+ ", emitStrategy=" + emitStrategy
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public class KsqlConfig extends AbstractConfig {
+ "KSQL metastore backup files are located.";

public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false;
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = true;
public static final String KSQL_SUPPRESS_ENABLED_DOC =
"Feature flag for suppression, specifically EMIT FINAL";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.OutputRefinement;
import java.time.Duration;
import java.util.Optional;
import org.junit.Test;
Expand All @@ -40,27 +41,29 @@ public void shouldThrowNPEs() {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
WindowInfo.of(SESSION, Optional.empty()),
WindowInfo.of(SESSION, Optional.empty())
WindowInfo.of(SESSION, Optional.empty(), Optional.empty()),
WindowInfo.of(SESSION, Optional.empty(), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)))
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty()),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)))
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)), Optional.empty()),
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)))
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.empty()),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.of(OutputRefinement.CHANGES)),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.of(OutputRefinement.FINAL))
)
.testEquals();
}

@Test
public void shouldImplementToString() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)));
final WindowInfo windowInfo = WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty());

// When:
final String result = windowInfo.toString();
Expand All @@ -73,7 +76,7 @@ public void shouldImplementToString() {
@Test
public void shouldGetType() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(SESSION, Optional.empty());
final WindowInfo windowInfo = WindowInfo.of(SESSION, Optional.empty(), Optional.of(OutputRefinement.CHANGES));

// When:
final WindowType result = windowInfo.getType();
Expand All @@ -85,7 +88,7 @@ public void shouldGetType() {
@Test
public void shouldGetSize() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)));
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)), Optional.of(OutputRefinement.CHANGES));

// When:
final Optional<Duration> result = windowInfo.getSize();
Expand All @@ -94,23 +97,35 @@ public void shouldGetSize() {
assertThat(result, is(Optional.of(Duration.ofSeconds(10))));
}

@Test
public void shouldGetEmitStrategy() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)), Optional.of(OutputRefinement.CHANGES));

// When:
final OutputRefinement result = windowInfo.getEmitStrategy();

// Then:
assertThat(result, is(OutputRefinement.CHANGES));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeProvidedButNotRequired() {
WindowInfo.of(SESSION, Optional.of(Duration.ofSeconds(10)));
WindowInfo.of(SESSION, Optional.of(Duration.ofSeconds(10)), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeRequiredButNotProvided() {
WindowInfo.of(TUMBLING, Optional.empty());
WindowInfo.of(TUMBLING, Optional.empty(), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeZero() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ZERO));
WindowInfo.of(TUMBLING, Optional.of(Duration.ZERO), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeNegative() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ofSeconds(-1)));
WindowInfo.of(TUMBLING, Optional.of(Duration.ofSeconds(-1)), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ public Optional<WindowExpression> getWindowExpression() {
final Optional<WindowExpression> windowExpression = original.getWindowExpression();
final Optional<RefinementInfo> refinementInfo = original.getRefinementInfo();

/* Return the original window expression, unless there is no grace period provided during a
suppression, in which case we rewrite the window expression to have a default grace period
of zero.
*/
if (!(windowExpression.isPresent()
&& !windowExpression.get().getKsqlWindowExpression().getGracePeriod().isPresent()
&& refinementInfo.isPresent()
&& refinementInfo.get().getOutputRefinement() == OutputRefinement.FINAL
)
) {
// we only need to rewrite if we have a window expression and if we use emit final
if (!windowExpression.isPresent()
|| !refinementInfo.isPresent()
|| refinementInfo.get().getOutputRefinement() == OutputRefinement.CHANGES) {
return original.getWindowExpression();
}

final Optional<WindowTimeClause> gracePeriod;
if (!windowExpression.get().getKsqlWindowExpression().getGracePeriod().isPresent()) {
gracePeriod = Optional.of(zeroGracePeriod);
} else {
gracePeriod = windowExpression.get().getKsqlWindowExpression().getGracePeriod();
}

final WindowExpression window = original.getWindowExpression().get();

final KsqlWindowExpression ksqlWindowNew;
Expand All @@ -165,21 +167,24 @@ public Optional<WindowExpression> getWindowExpression() {
((HoppingWindowExpression) ksqlWindowOld).getSize(),
((HoppingWindowExpression) ksqlWindowOld).getAdvanceBy(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else if (ksqlWindowOld instanceof TumblingWindowExpression) {
ksqlWindowNew = new TumblingWindowExpression(
location,
((TumblingWindowExpression) ksqlWindowOld).getSize(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else if (ksqlWindowOld instanceof SessionWindowExpression) {
ksqlWindowNew = new SessionWindowExpression(
location,
((SessionWindowExpression) ksqlWindowOld).getGap(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else {
throw new KsqlException("WINDOW type must be HOPPING, TUMBLING, or SESSION");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Optional;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class CommandFactories implements DdlCommandFactory {
Expand Down Expand Up @@ -114,11 +116,14 @@ public DdlCommand create(
}

@Override
public DdlCommand create(final KsqlStructuredDataOutputNode outputNode) {
public DdlCommand create(
final KsqlStructuredDataOutputNode outputNode,
final Optional<RefinementInfo> emitStrategy
) {
if (outputNode.getNodeOutputType() == DataSource.DataSourceType.KSTREAM) {
return createSourceFactory.createStreamCommand(outputNode);
} else {
return createSourceFactory.createTableCommand(outputNode);
return createSourceFactory.createTableCommand(outputNode, emitStrategy);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
Expand Down Expand Up @@ -142,14 +143,29 @@ public CreateStreamCommand createStreamCommand(
}

// This method is called by CREATE_AS statements
public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) {
public CreateTableCommand createTableCommand(
final KsqlStructuredDataOutputNode outputNode,
final Optional<RefinementInfo> emitStrategy
) {
Optional<WindowInfo> windowInfo =
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo();

if (windowInfo.isPresent() && emitStrategy.isPresent()) {
final WindowInfo info = windowInfo.get();
windowInfo = Optional.of(WindowInfo.of(
info.getType(),
info.getSize(),
Optional.of(emitStrategy.get().getOutputRefinement())
));
}

return new CreateTableCommand(
outputNode.getSinkName().get(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
windowInfo,
Optional.of(outputNode.getOrReplace()),
Optional.of(false)
);
Expand Down Expand Up @@ -269,7 +285,8 @@ private static LogicalSchema buildSchema(
}

private static Optional<WindowInfo> getWindowInfo(final CreateSourceProperties props) {
return props.getWindowType().map(type -> WindowInfo.of(type, props.getWindowSize()));
return props.getWindowType()
.map(type -> WindowInfo.of(type, props.getWindowSize(), Optional.empty()));
}

private static String ensureTopicExists(
Expand Down
Loading

0 comments on commit 2af201f

Please sign in to comment.