Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable new emit-final implementation #9141

Merged
merged 11 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;

/**
* Resolves Ksql and streams property name to a ConfigItem.
Expand Down Expand Up @@ -132,11 +133,21 @@ private static Optional<ConfigItem> resolveConfig(

final String keyNoPrefix = stripPrefix(propertyName, prefix);
final ConfigKey configKey = def.configKeys().get(keyNoPrefix);
if (configKey == null) {
return Optional.empty();
if (configKey != null) {
return Optional.of(ConfigItem.resolved(configKey));
}

if (isInternalStreamsConfig(keyNoPrefix)) {
return Optional.of(ConfigItem.unresolved(keyNoPrefix));
}

return Optional.of(ConfigItem.resolved(configKey));
return Optional.empty();
}

private static boolean isInternalStreamsConfig(final String key) {
// add more internal configs on demand
return key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX)
|| key.equals(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need this one to run QTTs...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only meant for QTT or are we expecting users to want/be able to set this for their own queries too? (Guessing the latter but want to check.)

Also, the first config (outer join spurious results fix emit interval) is unrelated to this PR, right? Is that another config we think users might want to set?

Copy link
Member Author

@mjsax mjsax Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are internal KS configs -- they control how often we try to emit results -- the other configs is for left/outer stream-stream joins (yes, it's unrelated to this PR, but serves the exact same purpose for a different operator).

We need them for QTT to ensure we alway try to emit result (both configs work on wall-clock time); otherwise, in QTT results might get "stuck" and are not emitted. I remember that for stream-stream join back in the days, we did not add test for all scenarios because we did not have access to this config.

Because both are internal, we don't really expect users to set them (with the exception that there is a perf problem and we tell them to change it). But we should be able for set the configs IMHO -- otherwise, the guard they provide for KS does not work for ksqlDB.

There are a few more internal configs, and we could consider to add them here, too. Also happy to remove the second config from this PR and we do some follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Just to confirm: users will be able to set these internal streams configs on their ksql queries (and have the configs take effect) after this change, right?

I think it's fine to leave the second config in -- we're essentially saying it's safe for users to set these configs on their ksql queries if they want to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, yes.

Of course, both configs are "internal" meaning it's not documented anywhere (on purpose) and user most likely don't know about them (and should not need to know about them in general).

}

private static String stripPrefix(final String maybePrefixedKey, final String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.errorprone.annotations.Immutable;
import java.util.Optional;

Expand All @@ -28,6 +29,7 @@ protected Node(final Optional<NodeLocation> location) {
this.location = requireNonNull(location, "location");
}

@JsonIgnore
public Optional<NodeLocation> getLocation() {
return location;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum OutputRefinement {
*
* <p>For a stream, all events are final, so all are output.
*/
FINAL;
FINAL
}


Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.OutputRefinement;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -31,17 +32,26 @@ public final class WindowInfo {

private final WindowType type;
private final Optional<Duration> size;
private final OutputRefinement emitStrategy;

@JsonCreator
public static WindowInfo of(
@JsonProperty(value = "type", required = true) final WindowType type,
@JsonProperty(value = "size") final Optional<Duration> size) {
return new WindowInfo(type, size);
@JsonProperty(value = "size") final Optional<Duration> size,
@JsonProperty(value = "emitStrategy") final Optional<OutputRefinement> emitStrategy) {
return new WindowInfo(type, size, emitStrategy);
}

private WindowInfo(final WindowType type, final Optional<Duration> size) {
private WindowInfo(
final WindowType type,
final Optional<Duration> size,
final Optional<OutputRefinement> emitStrategy
) {
this.type = Objects.requireNonNull(type, "type");
this.size = Objects.requireNonNull(size, "size");
this.emitStrategy = Objects.requireNonNull(
emitStrategy.orElse(OutputRefinement.CHANGES),
"emitStrategy");

if (type.requiresWindowSize() && !size.isPresent()) {
throw new IllegalArgumentException("Size required");
Expand All @@ -64,6 +74,10 @@ public Optional<Duration> getSize() {
return size;
}

public OutputRefinement getEmitStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the serialize/deserialize to/from JSON loop work for this, now that this getter is updated to return OutputRefinement while the constructor still accepts Optional<OutputRefinement>? (Hopefully yes and I assume yes if QTTs are passing, just curious since I haven't encountered the situation before.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 -- Interesting question... To me it just did not make sense to return an Optional if we know that it's always set... Did not think about JSON (de)serialization. But I agree, it seem to work so we should be good?

return emitStrategy;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -73,12 +87,17 @@ public boolean equals(final Object o) {
return false;
}
final WindowInfo that = (WindowInfo) o;

// we omit `emitStrategy` because `WindowInfo` is used to determine the topic format,
// and the emit-strategy has no impact on the serialization format
return type == that.type
&& Objects.equals(size, that.size);
}

@Override
public int hashCode() {
// we omit `emitStrategy` because `WindowInfo` is used to determine the topic format,
// and the emit-strategy has no impact on the serialization format
return Objects.hash(type, size);
}

Expand All @@ -87,6 +106,7 @@ public String toString() {
return "WindowInfo{"
+ "type=" + type
+ ", size=" + size.map(Duration::toMillis)
+ ", emitStrategy=" + emitStrategy
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public class KsqlConfig extends AbstractConfig {
+ "KSQL metastore backup files are located.";

public static final String KSQL_SUPPRESS_ENABLED = "ksql.suppress.enabled";
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = false;
public static final Boolean KSQL_SUPPRESS_ENABLED_DEFAULT = true;
public static final String KSQL_SUPPRESS_ENABLED_DOC =
"Feature flag for suppression, specifically EMIT FINAL";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.OutputRefinement;
import java.time.Duration;
import java.util.Optional;
import org.junit.Test;
Expand All @@ -40,27 +41,29 @@ public void shouldThrowNPEs() {
public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
WindowInfo.of(SESSION, Optional.empty()),
WindowInfo.of(SESSION, Optional.empty())
WindowInfo.of(SESSION, Optional.empty(), Optional.empty()),
WindowInfo.of(SESSION, Optional.empty(), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)))
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty()),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19))),
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)))
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)), Optional.empty()),
WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(19)), Optional.empty())
)
.addEqualityGroup(
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)))
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.empty()),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.of(OutputRefinement.CHANGES)),
WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(1010)), Optional.of(OutputRefinement.FINAL))
)
.testEquals();
}

@Test
public void shouldImplementToString() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)));
final WindowInfo windowInfo = WindowInfo.of(TUMBLING, Optional.of(Duration.ofMillis(19)), Optional.empty());

// When:
final String result = windowInfo.toString();
Expand All @@ -73,7 +76,7 @@ public void shouldImplementToString() {
@Test
public void shouldGetType() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(SESSION, Optional.empty());
final WindowInfo windowInfo = WindowInfo.of(SESSION, Optional.empty(), Optional.of(OutputRefinement.CHANGES));

// When:
final WindowType result = windowInfo.getType();
Expand All @@ -85,7 +88,7 @@ public void shouldGetType() {
@Test
public void shouldGetSize() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)));
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)), Optional.of(OutputRefinement.CHANGES));

// When:
final Optional<Duration> result = windowInfo.getSize();
Expand All @@ -94,23 +97,35 @@ public void shouldGetSize() {
assertThat(result, is(Optional.of(Duration.ofSeconds(10))));
}

@Test
public void shouldGetEmitStrategy() {
// Given:
final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofSeconds(10)), Optional.of(OutputRefinement.CHANGES));

// When:
final OutputRefinement result = windowInfo.getEmitStrategy();

// Then:
assertThat(result, is(OutputRefinement.CHANGES));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeProvidedButNotRequired() {
WindowInfo.of(SESSION, Optional.of(Duration.ofSeconds(10)));
WindowInfo.of(SESSION, Optional.of(Duration.ofSeconds(10)), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeRequiredButNotProvided() {
WindowInfo.of(TUMBLING, Optional.empty());
WindowInfo.of(TUMBLING, Optional.empty(), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeZero() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ZERO));
WindowInfo.of(TUMBLING, Optional.of(Duration.ZERO), Optional.empty());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfSizeNegative() {
WindowInfo.of(TUMBLING, Optional.of(Duration.ofSeconds(-1)));
WindowInfo.of(TUMBLING, Optional.of(Duration.ofSeconds(-1)), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,20 @@ public Optional<WindowExpression> getWindowExpression() {
final Optional<WindowExpression> windowExpression = original.getWindowExpression();
final Optional<RefinementInfo> refinementInfo = original.getRefinementInfo();

/* Return the original window expression, unless there is no grace period provided during a
suppression, in which case we rewrite the window expression to have a default grace period
of zero.
*/
if (!(windowExpression.isPresent()
&& !windowExpression.get().getKsqlWindowExpression().getGracePeriod().isPresent()
&& refinementInfo.isPresent()
&& refinementInfo.get().getOutputRefinement() == OutputRefinement.FINAL
)
) {
// we only need to rewrite if we have a window expression and if we use emit final
if (!windowExpression.isPresent()
|| !refinementInfo.isPresent()
|| refinementInfo.get().getOutputRefinement() == OutputRefinement.CHANGES) {
return original.getWindowExpression();
}

final Optional<WindowTimeClause> gracePeriod;
if (!windowExpression.get().getKsqlWindowExpression().getGracePeriod().isPresent()) {
gracePeriod = Optional.of(zeroGracePeriod);
} else {
gracePeriod = windowExpression.get().getKsqlWindowExpression().getGracePeriod();
}

final WindowExpression window = original.getWindowExpression().get();

final KsqlWindowExpression ksqlWindowNew;
Expand All @@ -165,21 +167,24 @@ public Optional<WindowExpression> getWindowExpression() {
((HoppingWindowExpression) ksqlWindowOld).getSize(),
((HoppingWindowExpression) ksqlWindowOld).getAdvanceBy(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else if (ksqlWindowOld instanceof TumblingWindowExpression) {
ksqlWindowNew = new TumblingWindowExpression(
location,
((TumblingWindowExpression) ksqlWindowOld).getSize(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else if (ksqlWindowOld instanceof SessionWindowExpression) {
ksqlWindowNew = new SessionWindowExpression(
location,
((SessionWindowExpression) ksqlWindowOld).getGap(),
retention,
Optional.of(zeroGracePeriod)
gracePeriod,
Optional.of(OutputRefinement.FINAL)
);
} else {
throw new KsqlException("WINDOW type must be HOPPING, TUMBLING, or SESSION");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Optional;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class CommandFactories implements DdlCommandFactory {
Expand Down Expand Up @@ -114,11 +116,14 @@ public DdlCommand create(
}

@Override
public DdlCommand create(final KsqlStructuredDataOutputNode outputNode) {
public DdlCommand create(
final KsqlStructuredDataOutputNode outputNode,
final Optional<RefinementInfo> emitStrategy
vcrfxia marked this conversation as resolved.
Show resolved Hide resolved
) {
if (outputNode.getNodeOutputType() == DataSource.DataSourceType.KSTREAM) {
return createSourceFactory.createStreamCommand(outputNode);
} else {
return createSourceFactory.createTableCommand(outputNode);
return createSourceFactory.createTableCommand(outputNode, emitStrategy);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.RefinementInfo;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.SerdeFeaturesFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
Expand Down Expand Up @@ -142,14 +143,29 @@ public CreateStreamCommand createStreamCommand(
}

// This method is called by CREATE_AS statements
public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) {
public CreateTableCommand createTableCommand(
final KsqlStructuredDataOutputNode outputNode,
final Optional<RefinementInfo> emitStrategy
vcrfxia marked this conversation as resolved.
Show resolved Hide resolved
) {
Optional<WindowInfo> windowInfo =
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo();

if (windowInfo.isPresent() && emitStrategy.isPresent()) {
final WindowInfo info = windowInfo.get();
windowInfo = Optional.of(WindowInfo.of(
info.getType(),
info.getSize(),
Optional.of(emitStrategy.get().getOutputRefinement())
));
}

return new CreateTableCommand(
outputNode.getSinkName().get(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
windowInfo,
Optional.of(outputNode.getOrReplace()),
Optional.of(false)
);
Expand Down Expand Up @@ -269,7 +285,8 @@ private static LogicalSchema buildSchema(
}

private static Optional<WindowInfo> getWindowInfo(final CreateSourceProperties props) {
return props.getWindowType().map(type -> WindowInfo.of(type, props.getWindowSize()));
return props.getWindowType()
.map(type -> WindowInfo.of(type, props.getWindowSize(), Optional.empty()));
}

private static String ensureTopicExists(
Expand Down
Loading