Skip to content

Commit

Permalink
Add default serialization config to apps (#239)
Browse files Browse the repository at this point in the history
Fixes #235
  • Loading branch information
philipp94831 authored Jul 25, 2024
1 parent 9b67bc7 commit cc19d5e
Show file tree
Hide file tree
Showing 33 changed files with 515 additions and 125 deletions.
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyStreamsApplication(), args);
}

@Override
Expand All @@ -86,6 +88,11 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down Expand Up @@ -142,12 +149,14 @@ import com.bakdata.kafka.KafkaProducerApplication;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
startApplication(new MyProducerApplication(), args);
}

@Override
Expand All @@ -162,6 +171,11 @@ public class StreamsBootstrapApplication extends KafkaProducerApplication {
};
}

@Override
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, StringSerializer.class);
}

// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
Expand Down
41 changes: 41 additions & 0 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import net.mguenther.kafka.junit.SendKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.Consumed;
import org.junit.jupiter.api.Test;

Expand All @@ -63,6 +64,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -91,6 +97,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
}), new String[]{
"--brokers", "localhost:9092",
"--schema-registry-url", "http://localhost:8081",
Expand All @@ -115,6 +126,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -147,6 +163,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down Expand Up @@ -179,6 +200,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -210,6 +236,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "app";
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
})) {
kafkaCluster.start();
kafkaCluster.createTopic(TopicConfig.withName(input).build());
Expand Down Expand Up @@ -249,6 +280,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}
}, new String[]{
Expand All @@ -275,6 +311,11 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
throw new UnsupportedOperationException();
}

@Override
public SerdeConfig defaultSerializationConfig() {
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
Expand Down Expand Up @@ -57,6 +58,11 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
return CloseFlagApp.this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

@Override
public void close() {
CloseFlagApp.this.appClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import com.bakdata.kafka.SerializerConfig;
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.TopicConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand Down Expand Up @@ -85,10 +86,8 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
}

@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class
);
public SerializerConfig defaultSerializationConfig() {
return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class);
}
})) {
app.setBrokers(this.kafkaCluster.getBrokerList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
Expand All @@ -43,4 +45,9 @@ public String getUniqueAppId(final StreamsTopicConfig topics) {
return this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Arrays;
import java.util.regex.Pattern;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
Expand All @@ -56,4 +58,9 @@ public void buildTopology(final TopologyBuilder builder) {
public String getUniqueAppId(final StreamsTopicConfig topics) {
return this.getClass().getSimpleName() + "-" + topics.getOutputTopic();
}

@Override
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, StringSerde.class);
}
}
3 changes: 2 additions & 1 deletion streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)
val confluentVersion: String by project
implementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion)
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
api(
group = "org.slf4j",
Expand Down Expand Up @@ -42,6 +42,7 @@ dependencies {
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
* @param <T> type of topic config
* @param <C> type of clean up config
*/
@FunctionalInterface
public interface App<T, C> extends AutoCloseable {

/**
Expand Down Expand Up @@ -63,4 +62,10 @@ default Map<String, Object> createKafkaProperties() {
default void setup(final EffectiveAppConfiguration<T> configuration) {
// do nothing by default
}

/**
* Configure default serialization behavior
* @return {@code SerializationConfig}
*/
SerializationConfig defaultSerializationConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@

import static java.util.Collections.emptyMap;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

/**
* A {@link ProducerApp} with a corresponding {@link AppConfiguration}
Expand All @@ -45,17 +43,9 @@ public class ConfiguredProducerApp<T extends ProducerApp> implements ConfiguredA
private final @NonNull T app;
private final @NonNull AppConfiguration<ProducerTopicConfig> configuration;

private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig endpointConfig) {
private static Map<String, Object> createBaseConfig() {
final Map<String, Object> kafkaConfig = new HashMap<>();

if (endpointConfig.isSchemaRegistryConfigured()) {
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
} else {
kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
}

kafkaConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");

Expand All @@ -70,12 +60,6 @@ private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig en
* Configuration is created in the following order
* <ul>
* <li>
* {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and
* {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} are configured based on
* {@link KafkaEndpointConfig#isSchemaRegistryConfigured()}.
* If Schema Registry is configured, {@link SpecificAvroSerializer} is used, otherwise
* {@link StringSerializer} is used.
* Additionally, the following is configured:
* <pre>
* max.in.flight.requests.per.connection=1
* acks=all
Expand All @@ -95,6 +79,11 @@ private static Map<String, Object> createBaseConfig(final KafkaEndpointConfig en
* <li>
* Configs provided by {@link KafkaEndpointConfig#createKafkaProperties()}
* </li>
* <li>
* {@link ProducerConfig#KEY_SERIALIZER_CLASS_CONFIG} and
* {@link ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} is configured using
* {@link ProducerApp#defaultSerializationConfig()}
* </li>
* </ul>
*
* @param endpointConfig endpoint to run app on
Expand Down Expand Up @@ -130,7 +119,7 @@ public void close() {
}

private KafkaPropertiesFactory createPropertiesFactory(final KafkaEndpointConfig endpointConfig) {
final Map<String, Object> baseConfig = createBaseConfig(endpointConfig);
final Map<String, Object> baseConfig = createBaseConfig();
return KafkaPropertiesFactory.builder()
.baseConfig(baseConfig)
.app(this.app)
Expand Down
Loading

0 comments on commit cc19d5e

Please sign in to comment.