From c87aa6940aa480dbecf1393ff16a70d53a9d51d5 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Wed, 15 Jul 2020 18:50:34 -0700 Subject: [PATCH] fix: always use the changelog subject in table state stores (#5823) (#5837) --- .../processing/LoggingDeserializer.java | 69 +++++++- .../ksql/serde/StaticTopicSerde.java | 152 ++++++++++++++++ .../processing/LoggingDeserializerTest.java | 36 ++++ .../ksql/serde/StaticTopicSerdeTest.java | 166 ++++++++++++++++++ .../streams/RegisterSchemaCallback.java | 60 +++++++ .../ksql/execution/streams/SourceBuilder.java | 78 +++++++- .../streams/RegisterSchemaCallbackTest.java | 58 ++++++ .../execution/streams/SourceBuilderTest.java | 40 +++++ 8 files changed, 650 insertions(+), 9 deletions(-) create mode 100644 ksqldb-serde/src/main/java/io/confluent/ksql/serde/StaticTopicSerde.java create mode 100644 ksqldb-serde/src/test/java/io/confluent/ksql/serde/StaticTopicSerdeTest.java create mode 100644 ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/RegisterSchemaCallback.java create mode 100644 ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/RegisterSchemaCallbackTest.java diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java b/ksqldb-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java index d25c9ed95a2b..57bb91bb0829 100644 --- a/ksqldb-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/logging/processing/LoggingDeserializer.java @@ -17,6 +17,7 @@ import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.Optional; import org.apache.kafka.common.serialization.Deserializer; @@ -41,11 +42,26 @@ public void configure(final Map configs, final boolean isKey) { @Override public T deserialize(final String topic, final byte[] bytes) { + return tryDeserialize(topic, bytes).get(); + } + + /** + * Similar to {@link #deserialize(String, byte[])}, but allows the erroneous case + * to delay the log-and-throw behavior until {@link DelayedResult#get()} is called. + * + *

This can be used in the scenarios when an error is expected, such as if a retry + * will likely solve the problem, to avoid spamming the processing logger with messages + * that are not helpful to the end user.

+ */ + public DelayedResult tryDeserialize(final String topic, final byte[] bytes) { try { - return delegate.deserialize(topic, bytes); + return new DelayedResult(delegate.deserialize(topic, bytes)); } catch (final RuntimeException e) { - processingLogger.error(new DeserializationError(e, Optional.ofNullable(bytes), topic)); - throw e; + return new DelayedResult( + e, + new DeserializationError(e, Optional.ofNullable(bytes), topic), + processingLogger + ); } } @@ -53,4 +69,51 @@ public T deserialize(final String topic, final byte[] bytes) { public void close() { delegate.close(); } + + public static class DelayedResult { + + private final T result; + private final RuntimeException error; + private final ProcessingLogger processingLogger; + private final DeserializationError deserializationError; + + public DelayedResult( + final RuntimeException error, + final DeserializationError deserializationError, + final ProcessingLogger processingLogger + ) { + this.result = null; + this.error = error; + this.deserializationError = requireNonNull(deserializationError, "deserializationError"); + this.processingLogger = requireNonNull(processingLogger, "processingLogger"); + } + + public DelayedResult(final T result) { + this.result = result; + this.error = null; + this.deserializationError = null; + this.processingLogger = null; + } + + public boolean isError() { + return error != null; + } + + @VisibleForTesting + RuntimeException getError() { + return error; + } + + @SuppressWarnings("ConstantConditions") + public T get() { + if (isError()) { + processingLogger.error(deserializationError); + throw error; + } + + return result; + } + + } + } diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/StaticTopicSerde.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/StaticTopicSerde.java new file mode 100644 index 000000000000..241ff28edba9 --- /dev/null +++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/StaticTopicSerde.java @@ -0,0 +1,152 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.logging.processing.LoggingDeserializer; +import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** + * The {@code StaticTopicSerde} hard codes the topic name that is passed + * to the delegate Serde, regardless to what the caller passes in as the + * topic. The only exception is that if a deserialization attempt fails, + * the deserializer will attempt one more time using the topic that was + * passed to the Serde (instead of the hard coded value). In this situation, + * the {@code onFailure} callback is called so that the user of this class + * can remedy the issue (i.e. register an extra schema under the hard coded + * topic). The callback will not be called if both serialization attempts fail. + * + *

This class is intended as a workaround for the issues described in + * both KAFKA-10179 and KSQL-5673; specifically, it allows a materialized + * state store to use a different topic name than that which Kafka Streams + * passes in to the Serde.

+ * + *

Think carefully before reusing this class! It's inteded use case is + * very narrow.

+ */ +public final class StaticTopicSerde implements Serde { + + public interface Callback { + + /** + * This method is called when the {@link Serde#deserializer()}'s produced by + * this class' {@link Deserializer#deserialize(String, byte[])} method fails + * using the static topic but succeeds using the source topic. + * + * @param sourceTopic the original topic that was passed in to the deserializer + * @param staticTopic the hard coded topic that was passed into the {@code StaticTopicSerde} + * @param data the data that failed deserialization + */ + void onDeserializationFailure(String sourceTopic, String staticTopic, byte[] data); + } + + private final Serde delegate; + private final String topic; + private final Callback onFailure; + + /** + * @param topic the topic to hardcode + * @param serde the delegate serde + * @param onFailure a callback to call on failure + * + * @return a serde which delegates to {@code serde} but passes along {@code topic} + * in place of whatever the actual topic is + */ + public static Serde wrap( + final String topic, + final Serde serde, + final Callback onFailure + ) { + return new StaticTopicSerde<>(topic, serde, onFailure); + } + + private StaticTopicSerde( + final String topic, + final Serde delegate, + final Callback onFailure + ) { + this.topic = Objects.requireNonNull(topic, "topic"); + this.delegate = Objects.requireNonNull(delegate, "delegate"); + this.onFailure = Objects.requireNonNull(onFailure, "onFailure"); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + delegate.configure(configs, isKey); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public Serializer serializer() { + final Serializer serializer = delegate.serializer(); + return (topic, data) -> serializer.serialize(this.topic, data); + } + + @Override + public Deserializer deserializer() { + final Deserializer deserializer = delegate.deserializer(); + + if (deserializer instanceof LoggingDeserializer) { + final LoggingDeserializer loggingDeserializer = (LoggingDeserializer) deserializer; + + return (topic, data) -> { + final DelayedResult staticResult = loggingDeserializer.tryDeserialize(this.topic, data); + if (!staticResult.isError()) { + return staticResult.get(); + } + + // if both attempts error, then staticResult.get() will log the error to + // the processing log and throw - do not call the callback in this case + final DelayedResult sourceResult = loggingDeserializer.tryDeserialize(topic, data); + if (sourceResult.isError()) { + return staticResult.get(); + } + + onFailure.onDeserializationFailure(topic, this.topic, data); + return sourceResult.get(); + }; + } + + return (topic, data) -> { + try { + return deserializer.deserialize(this.topic, data); + } catch (final Exception e) { + final T object = deserializer.deserialize(topic, data); + onFailure.onDeserializationFailure(topic, this.topic, data); + return object; + } + }; + } + + @VisibleForTesting + public String getTopic() { + return topic; + } + + @VisibleForTesting + public Callback getOnFailure() { + return onFailure; + } +} diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java index 09093a810d09..d61d23757054 100644 --- a/ksqldb-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/logging/processing/LoggingDeserializerTest.java @@ -16,7 +16,10 @@ package io.confluent.ksql.logging.processing; import static io.confluent.ksql.GenericRow.genericRow; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -24,12 +27,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.testing.NullPointerTester; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; import java.util.function.Function; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.connect.data.SchemaAndValue; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -99,6 +104,20 @@ public void shouldDeserializeWithDelegate() { verify(delegate).deserialize("some topic", SOME_BYTES); } + @Test + public void shouldTryDeserializeWithDelegate() { + // Given: + when(delegate.deserialize(any(), any())).thenReturn(SOME_ROW); + + // When: + final DelayedResult result = deserializer.tryDeserialize("some topic", SOME_BYTES); + + // Then: + verify(delegate).deserialize("some topic", SOME_BYTES); + assertThat(result.isError(), is(false)); + assertThat(result.get(), is(SOME_ROW)); + } + @Test(expected = ArithmeticException.class) public void shouldThrowIfDelegateThrows() { // Given: @@ -126,4 +145,21 @@ public void shouldLogOnException() { // Then: verify(processingLogger).error(new DeserializationError(e, Optional.of(SOME_BYTES), "t")); } + + @Test + public void shouldDelayLogOnException() { + // Given: + when(delegate.deserialize(any(), any())) + .thenThrow(new RuntimeException("outer", + new RuntimeException("inner", new RuntimeException("cause")))); + + // When: + final DelayedResult result = deserializer.tryDeserialize("t", SOME_BYTES); + + // Then: + assertTrue(result.isError()); + assertThrows(RuntimeException.class, result::get); + verify(processingLogger) + .error(new DeserializationError(result.getError(), Optional.of(SOME_BYTES), "t")); + } } \ No newline at end of file diff --git a/ksqldb-serde/src/test/java/io/confluent/ksql/serde/StaticTopicSerdeTest.java b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/StaticTopicSerdeTest.java new file mode 100644 index 000000000000..53e776bcfa3d --- /dev/null +++ b/ksqldb-serde/src/test/java/io/confluent/ksql/serde/StaticTopicSerdeTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import io.confluent.ksql.logging.processing.DeserializationError; +import io.confluent.ksql.logging.processing.LoggingDeserializer; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.serde.StaticTopicSerde.Callback; +import java.util.Optional; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes.WrapperSerde; +import org.apache.kafka.common.serialization.Serializer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class StaticTopicSerdeTest { + + private static final byte[] SOME_BYTES = new byte[]{1, 2, 3}; + private static final Object SOME_OBJECT = 1; + private static final String STATIC_TOPIC = "static"; + private static final String SOURCE_TOPIC = "source"; + + @Mock + private Serializer delegateS; + @Mock + private Deserializer delegateD; + @Mock + private Callback callback; + + private Serde staticSerde; + + @Before + public void setUp() { + final Serde delegate = new WrapperSerde<>(delegateS, delegateD); + + when(delegateS.serialize(Mockito.any(), Mockito.any())).thenReturn(SOME_BYTES); + when(delegateD.deserialize(Mockito.any(), Mockito.any())).thenReturn(SOME_OBJECT); + + staticSerde = StaticTopicSerde.wrap(STATIC_TOPIC, delegate, callback); + } + + @Test + public void shouldUseDelegateSerializerWithStaticTopic() { + // When: + final byte[] serialized = staticSerde.serializer().serialize(SOURCE_TOPIC, SOME_OBJECT); + + // Then: + verify(delegateS).serialize(STATIC_TOPIC, SOME_OBJECT); + assertThat(serialized, is(SOME_BYTES)); + verifyZeroInteractions(callback); + } + + @Test + public void shouldUseDelegateDeserializerWithStaticTopic() { + // When: + final Object deserialized = staticSerde.deserializer().deserialize(SOURCE_TOPIC, SOME_BYTES); + + // Then: + verify(delegateD).deserialize(STATIC_TOPIC, SOME_BYTES); + assertThat(deserialized, is(SOME_OBJECT)); + verifyZeroInteractions(callback); + } + + @Test + public void shouldUseDelegateLoggingDeserializerWithStaticTopic() { + // Given: + final ProcessingLogger logger = mock(ProcessingLogger.class); + final LoggingDeserializer loggingDelegate = new LoggingDeserializer<>(delegateD, logger); + final Serde delegate = new WrapperSerde<>(delegateS, loggingDelegate); + + staticSerde = StaticTopicSerde.wrap(STATIC_TOPIC, delegate, callback); + + // When: + final Object deserialized = staticSerde.deserializer().deserialize(SOURCE_TOPIC, SOME_BYTES); + + // Then: + verify(delegateD).deserialize(STATIC_TOPIC, SOME_BYTES); + assertThat(deserialized, is(SOME_OBJECT)); + verifyZeroInteractions(callback); + verifyZeroInteractions(logger); + } + + @Test + public void shouldTrySourceTopicAndCallCallbackOnDeserializationFailure() { + // Given: + when(delegateD.deserialize(Mockito.eq(STATIC_TOPIC), Mockito.any())).thenThrow(new RuntimeException()); + + // When: + final Object deserialized = staticSerde.deserializer().deserialize(SOURCE_TOPIC, SOME_BYTES); + + // Then: + verify(delegateD).deserialize(STATIC_TOPIC, SOME_BYTES); + verify(delegateD).deserialize(SOURCE_TOPIC, SOME_BYTES); + verify(callback).onDeserializationFailure(SOURCE_TOPIC, STATIC_TOPIC, SOME_BYTES); + assertThat(deserialized, is(SOME_OBJECT)); + } + + @Test + public void shouldTrySourceTopicAndCallCallbackOnDeserializationFailureWithLoggingDeserializer() { + // Given: + when(delegateD.deserialize(Mockito.eq(STATIC_TOPIC), Mockito.any())).thenThrow(new RuntimeException()); + + final ProcessingLogger logger = mock(ProcessingLogger.class); + final LoggingDeserializer loggingDelegate = new LoggingDeserializer<>(delegateD, logger); + final Serde delegate = new WrapperSerde<>(delegateS, loggingDelegate); + + staticSerde = StaticTopicSerde.wrap(STATIC_TOPIC, delegate, callback); + + // When: + final Object deserialized = staticSerde.deserializer().deserialize(SOURCE_TOPIC, SOME_BYTES); + + // Then: + verifyZeroInteractions(logger); + verify(callback).onDeserializationFailure(SOURCE_TOPIC, STATIC_TOPIC, SOME_BYTES); + assertThat(deserialized, is(SOME_OBJECT)); + } + + @Test + public void shouldLogOriginalFailureIfBothFail() { + // Given: + when(delegateD.deserialize(Mockito.any(), Mockito.any())).thenThrow(new RuntimeException()); + + final ProcessingLogger logger = mock(ProcessingLogger.class); + final LoggingDeserializer loggingDelegate = new LoggingDeserializer<>(delegateD, logger); + final Serde delegate = new WrapperSerde<>(delegateS, loggingDelegate); + + staticSerde = StaticTopicSerde.wrap(STATIC_TOPIC, delegate, callback); + + // When: + final RuntimeException err = assertThrows( + RuntimeException.class, + () -> staticSerde.deserializer().deserialize(SOURCE_TOPIC, SOME_BYTES)); + + // Then: + verify(logger).error(new DeserializationError(err, Optional.of(SOME_BYTES), STATIC_TOPIC)); + verifyZeroInteractions(callback); + } + +} \ No newline at end of file diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/RegisterSchemaCallback.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/RegisterSchemaCallback.java new file mode 100644 index 000000000000..a33c2a955351 --- /dev/null +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/RegisterSchemaCallback.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.ksql.serde.StaticTopicSerde; +import io.confluent.ksql.util.KsqlConstants; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class RegisterSchemaCallback implements StaticTopicSerde.Callback { + + private static final Logger LOG = LoggerFactory.getLogger(RegisterSchemaCallback.class); + private final SchemaRegistryClient srClient; + + RegisterSchemaCallback(final SchemaRegistryClient srClient) { + this.srClient = Objects.requireNonNull(srClient, "srClient"); + } + + @Override + public void onDeserializationFailure( + final String source, + final String changelog, + final byte[] data + ) { + final String sourceSubject = source + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; + final String changelogSubject = changelog + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; + try { + // all schema registry events start with a magic byte 0x0 and then four bytes + // indicating the schema id - we extract that schema id from the data that failed + // to deserialize and then register it into the changelog subject + final int id = ByteBuffer.wrap(data, 1, Integer.BYTES).getInt(); + + LOG.info("Trying to fetch & register schema id {} under subject {}", id, changelogSubject); + final ParsedSchema schema = srClient.getSchemaBySubjectAndId(sourceSubject, id); + srClient.register(changelogSubject, schema); + } catch (IOException | RestClientException e) { + LOG.warn("Failed during deserialization callback for topic " + source, e); + } + } + +} diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java index f3b6d9c18b80..0634b5fe21bf 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/SourceBuilder.java @@ -14,6 +14,8 @@ package io.confluent.ksql.execution.streams; +import static io.confluent.ksql.util.KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG; +import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG; import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableList; @@ -36,8 +38,13 @@ import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.StaticTopicSerde; +import io.confluent.ksql.serde.StaticTopicSerde.Callback; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -63,9 +70,12 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.state.KeyValueStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class SourceBuilder { + private static final Logger LOG = LoggerFactory.getLogger(SourceBuilder.class); private static final Collection NULL_WINDOWED_KEY_COLUMNS = Collections.unmodifiableList( Arrays.asList(null, null, null) ); @@ -176,11 +186,12 @@ public static KTableHolder buildTable( consumedFactory ); + final String stateStoreName = tableChangeLogOpName(source.getProperties()); final Materialized> materialized = materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()) + stateStoreName ); final KTable ktable = buildKTable( @@ -188,7 +199,9 @@ public static KTableHolder buildTable( queryBuilder, consumed, nonWindowedKeyGenerator(source.getSourceSchema()), - materialized + materialized, + valueSerde, + stateStoreName ); return KTableHolder.unmaterialized( @@ -225,11 +238,12 @@ static KTableHolder> buildWindowedTable( consumedFactory ); + final String stateStoreName = tableChangeLogOpName(source.getProperties()); final Materialized, GenericRow, KeyValueStore> materialized = materializedFactory.create( keySerde, valueSerde, - tableChangeLogOpName(source.getProperties()) + stateStoreName ); final KTable, GenericRow> ktable = buildKTable( @@ -237,7 +251,9 @@ static KTableHolder> buildWindowedTable( queryBuilder, consumed, windowedKeyGenerator(source.getSourceSchema()), - materialized + materialized, + valueSerde, + stateStoreName ); return KTableHolder.unmaterialized( @@ -292,16 +308,26 @@ private static KTable buildKTable( final KsqlQueryBuilder queryBuilder, final Consumed consumed, final Function> keyGenerator, - final Materialized> materialized + final Materialized> materialized, + final Serde valueSerde, + final String stateStoreName ) { final boolean forceChangelog = streamSource instanceof TableSource && ((TableSource) streamSource).isForceChangelog(); final KTable table; if (!forceChangelog) { + final String changelogTopic = changelogTopic(queryBuilder, stateStoreName); + final Callback onFailure = getRegisterCallback( + queryBuilder, streamSource.getFormats().getValueFormat()); + table = queryBuilder .getStreamsBuilder() - .table(streamSource.getTopicName(), consumed, materialized); + .table( + streamSource.getTopicName(), + consumed.withValueSerde(StaticTopicSerde.wrap(changelogTopic, valueSerde, onFailure)), + materialized + ); } else { final KTable source = queryBuilder .getStreamsBuilder() @@ -318,6 +344,46 @@ private static KTable buildKTable( .transformValues(new AddKeyAndTimestampColumns<>(keyGenerator)); } + private static StaticTopicSerde.Callback getRegisterCallback( + final KsqlQueryBuilder builder, + final FormatInfo valueFormat + ) { + final boolean schemaRegistryEnabled = !builder + .getKsqlConfig() + .getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) + .isEmpty(); + + final boolean useSR = FormatFactory + .fromName(valueFormat.getFormat()) + .supportsSchemaInference(); + + if (!schemaRegistryEnabled || !useSR) { + return (t1, t2, data) -> { }; + } + + return new RegisterSchemaCallback(builder.getServiceContext().getSchemaRegistryClient()); + } + + /** + * This code mirrors the logic that generates the name for changelog topics + * in kafka streams, which follows the pattern: + *
+   *    applicationID + "-" + stateStoreName + "-changelog".
+   * 
+ */ + private static String changelogTopic( + final KsqlQueryBuilder queryBuilder, + final String stateStoreName + ) { + return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + queryBuilder.getKsqlConfig().getString(KSQL_SERVICE_ID_CONFIG) + + queryBuilder.getKsqlConfig().getString(KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG) + + queryBuilder.getQueryId().toString() + + "-" + + stateStoreName + + "-changelog"; + } + private static TimestampExtractor timestampExtractor( final KsqlConfig ksqlConfig, final LogicalSchema sourceSchema, diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/RegisterSchemaCallbackTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/RegisterSchemaCallbackTest.java new file mode 100644 index 000000000000..b65434e5bdb7 --- /dev/null +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/RegisterSchemaCallbackTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.execution.streams; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.ksql.util.KsqlConstants; +import java.io.IOException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class RegisterSchemaCallbackTest { + + private static final String SUFFIX = KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX; + private static final String SOURCE = "s1"; + private static final String CHANGELOG = "s2"; + private static final int ID = 1; + private static final byte[] SOME_DATA = new byte[]{0x0, 0x0, 0x0, 0x0, 0x1}; + + @Mock + private SchemaRegistryClient srClient; + @Mock + private ParsedSchema schema; + + @Test + public void shouldRegisterIdFromData() throws IOException, RestClientException { + // Given: + when(srClient.getSchemaBySubjectAndId(SOURCE + SUFFIX, ID)).thenReturn(schema); + final RegisterSchemaCallback call = new RegisterSchemaCallback(srClient); + + // When: + call.onDeserializationFailure(SOURCE, CHANGELOG, SOME_DATA); + + // Then: + verify(srClient).register(CHANGELOG + SUFFIX, schema); + } + +} \ No newline at end of file diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java index 45e1f5a78c19..f07c53b5b6b6 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/SourceBuilderTest.java @@ -18,8 +18,10 @@ import static io.confluent.ksql.GenericRow.genericRow; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -31,6 +33,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext; @@ -48,12 +51,16 @@ import io.confluent.ksql.execution.timestamp.TimestampColumn; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.StaticTopicSerde; import io.confluent.ksql.serde.WindowInfo; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import java.util.HashSet; import java.util.Optional; @@ -170,10 +177,16 @@ public class SourceBuilderTest { private Materialized> materialized; @Mock private ProcessingLogger processingLogger; + @Mock + private ServiceContext serviceContext; + @Mock + private SchemaRegistryClient srClient; @Captor private ArgumentCaptor> transformSupplierCaptor; @Captor private ArgumentCaptor timestampExtractorCaptor; + @Captor + private ArgumentCaptor> serdeCaptor; private final GenericRow row = genericRow("baz", 123); private PlanBuilder planBuilder; @@ -199,12 +212,16 @@ public void setup() { when(kTable.transformValues(any(ValueTransformerWithKeySupplier.class))).thenReturn(kTable); when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); + when(queryBuilder.getServiceContext()).thenReturn(serviceContext); + when(queryBuilder.getQueryId()).thenReturn(new QueryId("id")); when(queryBuilder.getKsqlConfig()).thenReturn(KSQL_CONFIG); when(processorCtx.timestamp()).thenReturn(A_ROWTIME); + when(serviceContext.getSchemaRegistryClient()).thenReturn(srClient); when(streamsFactories.getConsumedFactory()).thenReturn(consumedFactory); when(streamsFactories.getMaterializedFactory()).thenReturn(materializationFactory); when(materializationFactory.create(any(), any(), any())) .thenReturn((Materialized) materialized); + when(valueFormatInfo.getFormat()).thenReturn(FormatFactory.AVRO.name()); planBuilder = new KSPlanBuilder( queryBuilder, @@ -315,6 +332,7 @@ public void shouldApplyCorrectTransformationsToSourceTable() { public void shouldApplyCorrectTransformationsToSourceTableWithoutForcingChangelog() { // Given: givenUnwindowedSourceTable(false); + when(consumed.withValueSerde(any())).thenReturn(consumed); // When: final KTableHolder builtKTable = tableSource.build(planBuilder); @@ -329,6 +347,28 @@ public void shouldApplyCorrectTransformationsToSourceTableWithoutForcingChangelo verify(consumedFactory).create(keySerde, valueSerde); verify(consumed).withTimestampExtractor(any()); verify(consumed).withOffsetResetPolicy(AutoOffsetReset.EARLIEST); + + verify(consumed).withValueSerde(serdeCaptor.capture()); + final StaticTopicSerde value = serdeCaptor.getValue(); + assertThat(value.getTopic(), is("_confluent-ksql-default_query_id-base-Reduce-changelog")); + } + + @Test + public void shouldApplyCreateSchemaRegistryCallbackIfSchemaRegistryIsEnabled() { + // Given: + when(queryBuilder.getKsqlConfig()).thenReturn( + KSQL_CONFIG.cloneWithPropertyOverwrite( + ImmutableMap.of(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, "foo"))); + givenUnwindowedSourceTable(false); + when(consumed.withValueSerde(any())).thenReturn(consumed); + + // When: + tableSource.build(planBuilder); + + // Then: + verify(consumed).withValueSerde(serdeCaptor.capture()); + final StaticTopicSerde value = serdeCaptor.getValue(); + assertThat(value.getOnFailure(), instanceOf(RegisterSchemaCallback.class)); }