diff --git a/build.gradle b/build.gradle index b9a156d44..3d82df2ef 100644 --- a/build.gradle +++ b/build.gradle @@ -44,6 +44,8 @@ allprojects { } mavenCentral() + + maven { url "https://packages.confluent.io/maven/" } } spotless { diff --git a/dependencies.gradle b/dependencies.gradle index 041ae741f..88d4bbd86 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -29,12 +29,16 @@ ext { asyncapiCoreVersion = '1.0.0-EAP-2' + avroVersion = '1.11.3' + awaitilityVersion = '4.2.0' commonsLang3Version = '3.14.0' jsr305Version = '3.0.2' + kafkaAvroSerializerVersion = '7.5.1' + kafkaClientsVersion = '3.6.1' kafkaStreamsVersion = '3.6.1' diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/SpringwolfAutoConfiguration.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/SpringwolfAutoConfiguration.java index 57b4d0f7b..ef70e0f8e 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/SpringwolfAutoConfiguration.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/SpringwolfAutoConfiguration.java @@ -17,12 +17,17 @@ import io.github.stavshamir.springwolf.schemas.SchemasService; import io.github.stavshamir.springwolf.schemas.example.ExampleGenerator; import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator; +import io.github.stavshamir.springwolf.schemas.postprocessor.AvroSchemaPostProcessor; +import io.github.stavshamir.springwolf.schemas.postprocessor.ExampleGeneratorPostProcessor; +import io.github.stavshamir.springwolf.schemas.postprocessor.SchemasPostProcessor; +import io.github.stavshamir.springwolf.schemas.postprocessor.SwaggerSchemaPostProcessor; import io.swagger.v3.core.converter.ModelConverter; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.core.annotation.Order; import java.util.List; @@ -68,9 +73,9 @@ public ChannelsService channelsService(List channelsS @ConditionalOnMissingBean public SchemasService schemasService( List modelConverters, - ExampleGenerator exampleGenerator, + List schemaPostProcessors, SpringwolfConfigProperties springwolfConfigProperties) { - return new DefaultSchemasService(modelConverters, exampleGenerator, springwolfConfigProperties); + return new DefaultSchemasService(modelConverters, schemaPostProcessors, springwolfConfigProperties); } @Bean @@ -79,6 +84,27 @@ public AsyncApiDocketService asyncApiDocketService(SpringwolfConfigProperties sp return new DefaultAsyncApiDocketService(springwolfConfigProperties); } + @Bean + @ConditionalOnMissingBean + @Order(0) + public AvroSchemaPostProcessor avroSchemaPostProcessor() { + return new AvroSchemaPostProcessor(); + } + + @Bean + @ConditionalOnMissingBean + @Order(10) + public ExampleGeneratorPostProcessor exampleGeneratorPostProcessor(ExampleGenerator exampleGenerator) { + return new ExampleGeneratorPostProcessor(exampleGenerator); + } + + @Bean + @ConditionalOnMissingBean + @Order(100) + public SwaggerSchemaPostProcessor swaggerSchemaPostProcessor() { + return new SwaggerSchemaPostProcessor(); + } + @Bean @ConditionalOnMissingBean public ExampleGenerator exampleGenerator() { diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java index df89736b8..9687e50e7 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasService.java @@ -3,7 +3,7 @@ import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.header.AsyncHeaders; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties; -import io.github.stavshamir.springwolf.schemas.example.ExampleGenerator; +import io.github.stavshamir.springwolf.schemas.postprocessor.SchemasPostProcessor; import io.swagger.v3.core.converter.ModelConverter; import io.swagger.v3.core.converter.ModelConverters; import io.swagger.v3.core.jackson.TypeNameResolver; @@ -23,18 +23,18 @@ public class DefaultSchemasService implements SchemasService { private final ModelConverters converter = ModelConverters.getInstance(); - private final ExampleGenerator exampleGenerator; + private final List schemaPostProcessors; private final SpringwolfConfigProperties properties; private final Map definitions = new HashMap<>(); public DefaultSchemasService( List externalModelConverters, - ExampleGenerator exampleGenerator, + List schemaPostProcessors, SpringwolfConfigProperties properties) { externalModelConverters.forEach(converter::addConverter); - this.exampleGenerator = exampleGenerator; + this.schemaPostProcessors = schemaPostProcessors; this.properties = properties; } @@ -107,25 +107,6 @@ private R runWithFqnSetting(Function callable) { } private void postProcessSchema(Schema schema) { - generateExampleWhenMissing(schema); - removeSwaggerSchemaFields(schema); - } - - private void removeSwaggerSchemaFields(Schema schema) { - schema.setAdditionalProperties(null); - - Map properties = schema.getProperties(); - if (properties != null) { - properties.values().forEach(this::removeSwaggerSchemaFields); - } - } - - private void generateExampleWhenMissing(Schema schema) { - if (schema.getExample() == null) { - log.debug("Generate example for {}", schema.getName()); - - Object example = exampleGenerator.fromSchema(schema, definitions); - schema.setExample(example); - } + schemaPostProcessors.forEach(processor -> processor.process(schema, definitions)); } } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/example/ExampleJsonGenerator.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/example/ExampleJsonGenerator.java index 78803866f..20e1cff67 100644 --- a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/example/ExampleJsonGenerator.java +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/example/ExampleJsonGenerator.java @@ -59,7 +59,7 @@ public Object fromSchema(Schema schema, Map definitions) { String exampleString = buildSchema(schema, definitions); return objectMapper.readValue(exampleString, Object.class); } catch (JsonProcessingException | ExampleGeneratingException ex) { - log.warn("Failed to build json example for schema {}", schema.getName()); + log.info("Failed to build json example for schema {}", schema.getName(), ex); } return null; } diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessor.java new file mode 100644 index 000000000..1de83a3e4 --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessor.java @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.schemas.postprocessor; + +import io.swagger.v3.oas.models.media.Schema; +import org.springframework.util.StringUtils; + +import java.util.Map; + +/** + * Removes internal avro fields and classes from the schema. + *
+ * For now, this class is located in springwolf-core as it provides value for the cloud-stream and kafka plugin. + * To avoid to many (one-class) add-ons, this class was not moved to yet another artifact - as also no new dependencies are required. + * This may change in the future. + */ +public class AvroSchemaPostProcessor implements SchemasPostProcessor { + private static final String SCHEMA_PROPERTY = "schema"; + private static final String SPECIFIC_DATA_PROPERTY = "specificData"; + private static final String SCHEMA_REF = "org.apache.avro.Schema"; + private static final String SPECIFIC_DAT_REF = "org.apache.avro.specific.SpecificData"; + + @Override + public void process(Schema schema, Map definitions) { + removeAvroSchemas(definitions); + removeAvroProperties(schema); + } + + private void removeAvroProperties(Schema schema) { + Map properties = schema.getProperties(); + if (properties != null) { + Schema schemaPropertySchema = properties.getOrDefault(SCHEMA_PROPERTY, null); + Schema specificDataPropertySchema = properties.getOrDefault(SPECIFIC_DATA_PROPERTY, null); + if (schemaPropertySchema != null && specificDataPropertySchema != null) { + if (StringUtils.endsWithIgnoreCase(schemaPropertySchema.get$ref(), SCHEMA_REF) + && StringUtils.endsWithIgnoreCase(specificDataPropertySchema.get$ref(), SPECIFIC_DAT_REF)) { + properties.remove(SCHEMA_PROPERTY); + properties.remove(SPECIFIC_DATA_PROPERTY); + } + } + } + } + + private void removeAvroSchemas(Map definitions) { + definitions.entrySet().removeIf(entry -> StringUtils.startsWithIgnoreCase(entry.getKey(), "org.apache.avro")); + } +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/ExampleGeneratorPostProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/ExampleGeneratorPostProcessor.java new file mode 100644 index 000000000..e5270cc15 --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/ExampleGeneratorPostProcessor.java @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.schemas.postprocessor; + +import io.github.stavshamir.springwolf.schemas.example.ExampleGenerator; +import io.swagger.v3.oas.models.media.Schema; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +@RequiredArgsConstructor +@Slf4j +public class ExampleGeneratorPostProcessor implements SchemasPostProcessor { + private final ExampleGenerator exampleGenerator; + + @Override + public void process(Schema schema, Map definitions) { + if (schema.getExample() == null) { + log.debug("Generate example for {}", schema.getName()); + + Object example = exampleGenerator.fromSchema(schema, definitions); + schema.setExample(example); + } + } +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SchemasPostProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SchemasPostProcessor.java new file mode 100644 index 000000000..3cb450cf4 --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SchemasPostProcessor.java @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.schemas.postprocessor; + +import io.swagger.v3.oas.models.media.Schema; + +import java.util.Map; + +/** + * Internal interface to allow post-processing of a new schema (and their definition) after detection. + *
+ * It is closely coupled with the data structure of the SchemaService. + */ +public interface SchemasPostProcessor { + void process(Schema schema, Map definitions); +} diff --git a/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SwaggerSchemaPostProcessor.java b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SwaggerSchemaPostProcessor.java new file mode 100644 index 000000000..14049976f --- /dev/null +++ b/springwolf-core/src/main/java/io/github/stavshamir/springwolf/schemas/postprocessor/SwaggerSchemaPostProcessor.java @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.schemas.postprocessor; + +import io.swagger.v3.oas.models.media.Schema; + +import java.util.Map; + +public class SwaggerSchemaPostProcessor implements SchemasPostProcessor { + @Override + public void process(Schema schema, Map definitions) { + removeAdditionalProperties(schema); + } + + private void removeAdditionalProperties(Schema schema) { + schema.setAdditionalProperties(null); + + Map properties = schema.getProperties(); + if (properties != null) { + properties.values().forEach((property) -> removeAdditionalProperties(property)); + } + } +} diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AsyncAnnotationChannelsScannerTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AsyncAnnotationChannelsScannerTest.java index 1330c0e3f..16a0aa651 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AsyncAnnotationChannelsScannerTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/asyncapi/scanners/channels/annotation/AsyncAnnotationChannelsScannerTest.java @@ -23,7 +23,6 @@ import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties; import io.github.stavshamir.springwolf.schemas.DefaultSchemasService; import io.github.stavshamir.springwolf.schemas.SchemasService; -import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.NoArgsConstructor; @@ -73,8 +72,7 @@ public OperationData.OperationType getOperationType() { }; private final SpringwolfConfigProperties properties = new SpringwolfConfigProperties(); private final ClassScanner classScanner = mock(ClassScanner.class); - private final SchemasService schemasService = - new DefaultSchemasService(emptyList(), new ExampleJsonGenerator(), properties); + private final SchemasService schemasService = new DefaultSchemasService(emptyList(), emptyList(), properties); private final AsyncApiDocketService asyncApiDocketService = mock(AsyncApiDocketService.class); private final PayloadClassExtractor payloadClassExtractor = new PayloadClassExtractor(properties); diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasServiceTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasServiceTest.java index 604c132f4..5e1b49b16 100644 --- a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasServiceTest.java +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/DefaultSchemasServiceTest.java @@ -7,8 +7,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfConfigProperties; -import io.github.stavshamir.springwolf.schemas.example.ExampleGenerator; import io.github.stavshamir.springwolf.schemas.example.ExampleJsonGenerator; +import io.github.stavshamir.springwolf.schemas.postprocessor.ExampleGeneratorPostProcessor; +import io.github.stavshamir.springwolf.schemas.postprocessor.SchemasPostProcessor; +import io.github.stavshamir.springwolf.schemas.postprocessor.SwaggerSchemaPostProcessor; import io.swagger.v3.core.util.Json; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.annotation.Nullable; @@ -16,6 +18,7 @@ import lombok.NoArgsConstructor; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.io.IOException; import java.io.InputStream; @@ -30,12 +33,18 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; class DefaultSchemasServiceTest { - - private final ExampleGenerator exampleGenerator = new ExampleJsonGenerator(); - private final SchemasService schemasService = - new DefaultSchemasService(List.of(), exampleGenerator, new SpringwolfConfigProperties()); + private final SchemasPostProcessor schemasPostProcessor = Mockito.mock(SchemasPostProcessor.class); + private final SchemasService schemasService = new DefaultSchemasService( + List.of(), + List.of( + new ExampleGeneratorPostProcessor(new ExampleJsonGenerator()), + schemasPostProcessor, + new SwaggerSchemaPostProcessor()), + new SpringwolfConfigProperties()); private static final ObjectMapper objectMapper = Json.mapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); @@ -111,7 +120,7 @@ void getDefinitionWithFqnClassName() throws IOException { SpringwolfConfigProperties properties = new SpringwolfConfigProperties(); properties.setUseFqn(true); - SchemasService schemasServiceWithFqn = new DefaultSchemasService(List.of(), exampleGenerator, properties); + SchemasService schemasServiceWithFqn = new DefaultSchemasService(List.of(), List.of(), properties); // when Class clazz = @@ -127,6 +136,13 @@ void getDefinitionWithFqnClassName() throws IOException { assertThat(fqnClassName.length()).isGreaterThan(clazz.getSimpleName().length()); } + @Test + void postProcessorsAreCalled() { + schemasService.register(FooWithEnum.class); + + verify(schemasPostProcessor).process(any(), any()); + } + private String jsonResource(String path) throws IOException { InputStream s = this.getClass().getResourceAsStream(path); return new String(s.readAllBytes(), StandardCharsets.UTF_8); diff --git a/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessorTest.java b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessorTest.java new file mode 100644 index 000000000..e0578a996 --- /dev/null +++ b/springwolf-core/src/test/java/io/github/stavshamir/springwolf/schemas/postprocessor/AvroSchemaPostProcessorTest.java @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.schemas.postprocessor; + +import io.swagger.v3.oas.models.media.StringSchema; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +class AvroSchemaPostProcessorTest { + SchemasPostProcessor processor = new AvroSchemaPostProcessor(); + + @Test + void avroSchemasAreRemovedTest() { + // given + var avroSchema = new io.swagger.v3.oas.models.media.Schema(); + avroSchema.set$ref("#/components/schemas/org.apache.avro.Schema"); + + var avroSpecificData = new io.swagger.v3.oas.models.media.Schema(); + avroSpecificData.set$ref("#/components/schemas/org.apache.avro.specific.SpecificData"); + + var schema = new io.swagger.v3.oas.models.media.Schema(); + schema.setProperties(new HashMap<>( + Map.of("foo", new StringSchema(), "schema", avroSchema, "specificData", avroSpecificData))); + + var definitions = new HashMap(); + definitions.put("customClassRefUnusedInThisTest", new StringSchema()); + definitions.put("org.apache.avro.Schema", new io.swagger.v3.oas.models.media.Schema()); + definitions.put("org.apache.avro.ConversionJava.lang.Object", new io.swagger.v3.oas.models.media.Schema()); + + // when + processor.process(schema, definitions); + + // then + assertThat(schema.getProperties()).isEqualTo(Map.of("foo", new StringSchema())); + assertThat(definitions).isEqualTo(Map.of("customClassRefUnusedInThisTest", new StringSchema())); + } +} diff --git a/springwolf-examples/springwolf-cloud-stream-example/docker-compose.yml b/springwolf-examples/springwolf-cloud-stream-example/docker-compose.yml index 7bcdb20b4..c65ed0a53 100644 --- a/springwolf-examples/springwolf-cloud-stream-example/docker-compose.yml +++ b/springwolf-examples/springwolf-cloud-stream-example/docker-compose.yml @@ -10,24 +10,25 @@ services: - "8080:8080" depends_on: - kafka - - zookeeper - - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: kafka: image: confluentinc/cp-kafka:latest - depends_on: - - zookeeper ports: - - "29092:29092" + - "9092:9092" # plaintext - no authentication + - "9093:9093" # sasl + volumes: + - ./broker_jaas.conf:/etc/kafka/secrets/broker_jaas.conf environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_NODE_ID: 1 + CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A==' + KAFKA_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092, SASL_PLAINTEXT://localhost:9093,SASL_PLAINTEXT_HOST://kafka:29093, CONTROLLER://kafka:29099 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092, SASL_PLAINTEXT://localhost:9093,SASL_PLAINTEXT_HOST://kafka:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT, SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT, CONTROLLER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_HOST + KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29099' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf diff --git a/springwolf-examples/springwolf-kafka-example/README.md b/springwolf-examples/springwolf-kafka-example/README.md index d48aa8526..c1f2eda4d 100644 --- a/springwolf-examples/springwolf-kafka-example/README.md +++ b/springwolf-examples/springwolf-kafka-example/README.md @@ -5,12 +5,12 @@ 2. Run `$ docker-compose up`. 3. Visit `localhost:8080/springwolf/asyncapi-ui.html` or try the API: `$ curl localhost:8080/springwolf/docs`. +Note: kafka-schema-registry (avro) and akhq (a kafka ui) are not started by default + ### Run with gradle -1. Verify zookeeper and kafka are running. +1. Verify kafka (and kafka-schema-registry if you want to test avro as well) are running. 2. If your kafka is not configured to automatically add topics, manually add a topic named `example-topic`. 3. Define an environment variable with the bootstrap server details: `$ export BOOTSTRAP_SERVER=localhost:9092`. 4. Clone this repository: `$ git clone https://github.com/springwolf/springwolf-core.git`. 5. Start the application: `$ cd springwolf-core && ./gradlew build -p springwolf-examples/springwolf-kafka-example bootRun`. 6. Visit `localhost:8080/springwolf/asyncapi-ui.html` or try the API: `$ curl localhost:8080/springwolf/docs`. - -_Tested with Kafka version 2.12_0.10.2.2._ diff --git a/springwolf-examples/springwolf-kafka-example/broker_jaas.conf b/springwolf-examples/springwolf-kafka-example/broker_jaas.conf index 7ba4f0f37..3980c93fd 100644 --- a/springwolf-examples/springwolf-kafka-example/broker_jaas.conf +++ b/springwolf-examples/springwolf-kafka-example/broker_jaas.conf @@ -6,9 +6,3 @@ KafkaServer { user_client="client-secret" user_connect="connect-secret"; }; - -Client { - org.apache.zookeeper.server.auth.DigestLoginModule required - username="kafka" - password="kafkasecret"; -}; diff --git a/springwolf-examples/springwolf-kafka-example/build.gradle b/springwolf-examples/springwolf-kafka-example/build.gradle index 6b4b10e1c..10f98bf54 100644 --- a/springwolf-examples/springwolf-kafka-example/build.gradle +++ b/springwolf-examples/springwolf-kafka-example/build.gradle @@ -7,6 +7,8 @@ plugins { id 'com.bmuschko.docker-spring-boot-application' id 'org.springdoc.openapi-gradle-plugin' version '1.8.0' + + id 'com.github.davidmc24.gradle.plugin.avro' version '1.9.1' } dependencies { @@ -32,6 +34,10 @@ dependencies { implementation "org.springframework.security:spring-security-config" implementation "org.springframework.security:spring-security-web" + implementation "org.apache.avro:avro:${avroVersion}@jar" + implementation "io.confluent:kafka-avro-serializer:${kafkaAvroSerializerVersion}" + permitUnusedDeclared "io.confluent:kafka-avro-serializer:${kafkaAvroSerializerVersion}@jar" + implementation "io.swagger.core.v3:swagger-annotations:${swaggerVersion}" implementation "org.slf4j:slf4j-api:${slf4jApiVersion}" diff --git a/springwolf-examples/springwolf-kafka-example/docker-compose.yml b/springwolf-examples/springwolf-kafka-example/docker-compose.yml index 90551044f..683dd0941 100644 --- a/springwolf-examples/springwolf-kafka-example/docker-compose.yml +++ b/springwolf-examples/springwolf-kafka-example/docker-compose.yml @@ -7,40 +7,72 @@ services: environment: BOOTSTRAP_SERVER: kafka:29092 BOOTSTRAP_SERVER_SASL: kafka:29093 + KAFKA_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081 ports: - "8080:8080" depends_on: - kafka - - zookeeper - - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: - KAFKA_OPTS: - -Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf - -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider - -DrequireClientAuthScheme=sasl - volumes: - - ./zookeeper_jaas.conf:/etc/kafka/secrets/zookeeper_jaas.conf kafka: image: confluentinc/cp-kafka:latest - depends_on: - - zookeeper ports: - "9092:9092" # plaintext - no authentication - "9093:9093" # sasl volumes: - ./broker_jaas.conf:/etc/kafka/secrets/broker_jaas.conf environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092, SASL_PLAINTEXT://localhost:9093,SASL_PLAINTEXT_HOST://kafka:29093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT, SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_HOST + KAFKA_NODE_ID: 1 + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk==' + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, INTERNAL://kafka:29092, SASL_PLAINTEXT://0.0.0.0:9093, SASL_INTERNAL://kafka:29093, CONTROLLER://kafka:29099 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,INTERNAL://kafka:29092, SASL_PLAINTEXT://localhost:9093,SASL_INTERNAL://kafka:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT, SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_INTERNAL:SASL_PLAINTEXT, CONTROLLER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_PROCESS_ROLES: 'controller,broker' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29099' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SCHEMA_REGISTRY_URL: 'kafka-schema-registry:8081' KAFKA_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf + + kafka-schema-registry: + image: confluentinc/cp-schema-registry:latest + ports: + - "8081:8081" + profiles: + - test # avoid starting (and pulling) this container by default + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092 + SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + depends_on: + - kafka + links: + - kafka + + akhq: + image: tchiotludo/akhq:latest + ports: + - "8085:8080" + profiles: + - test # avoid starting (and pulling) this container by default + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + local: + properties: + bootstrap.servers: kafka:29092 + schema-registry: + url: http://kafka-schema-registry:8081 + type: confluent + local-sasl: + properties: + bootstrap.servers: kafka:29093 + security.protocol: SASL_PLAINTEXT + sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="broker" password="broker-secret"; + sasl.mechanism: PLAIN + schema-registry: + url: http://kafka-schema-registry:8081 + type: confluent diff --git a/springwolf-examples/springwolf-kafka-example/src/main/avro/ExamplePayloadAvroDto.avsc b/springwolf-examples/springwolf-kafka-example/src/main/avro/ExamplePayloadAvroDto.avsc new file mode 100644 index 000000000..23f6dc3a4 --- /dev/null +++ b/springwolf-examples/springwolf-kafka-example/src/main/avro/ExamplePayloadAvroDto.avsc @@ -0,0 +1,26 @@ +{ + "type": "record", + "name": "ExamplePayloadAvroDto", + "namespace": "io.github.stavshamir.springwolf.example.kafka.dto.avro", + "fields": [{ + "name": "someString", + "type": ["null", "string"], + "default": null + }, { + "name": "someLong", + "type": ["null", "int"], + "default": null + }, { + "name": "someEnum", + "type": { + "type": "enum", + "name": "ExampleEnum", + "symbols": [ + "FOO1", + "FOO2", + "FOO3" + ] + }, + "default": "FOO1" + }] +} diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/configuration/CustomSpringwolfKafkaProducer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/configuration/CustomSpringwolfKafkaProducer.java new file mode 100644 index 000000000..2258286db --- /dev/null +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/configuration/CustomSpringwolfKafkaProducer.java @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.example.kafka.configuration; + +import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties; +import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateProvider; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Component +@Primary +@RequiredArgsConstructor +public class CustomSpringwolfKafkaProducer implements SpringwolfKafkaTemplateProvider { + private final SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties; + + @Value("${KAFKA_SCHEMA_REGISTRY_URL:http://localhost:8081}") + private String kafkaSchemaRegistryUrl; + + @Override + public boolean isPresent() { + return springwolfKafkaTemplateFromProperties.isPresent(); + } + + @Override + public Optional> get(String topic) { + return springwolfKafkaTemplateFromProperties + .get(topic) + .map(objectObjectKafkaTemplate -> customize(objectObjectKafkaTemplate, topic)); + } + + private KafkaTemplate customize(KafkaTemplate kafkaTemplate, String topic) { + if (topic.contains("avro")) { + Map producerProperties = + new HashMap<>(kafkaTemplate.getProducerFactory().getConfigurationProperties()); + + // configure the producerProperties to use avro serializer/deserializer + producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + producerProperties.put("schema.registry.url", kafkaSchemaRegistryUrl); + + DefaultKafkaProducerFactory producerFactory = + new DefaultKafkaProducerFactory<>(producerProperties); + return new KafkaTemplate<>(producerFactory); + } + return kafkaTemplate; + } +} diff --git a/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/consumers/AvroConsumer.java b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/consumers/AvroConsumer.java new file mode 100644 index 000000000..8b22d2b53 --- /dev/null +++ b/springwolf-examples/springwolf-kafka-example/src/main/java/io/github/stavshamir/springwolf/example/kafka/consumers/AvroConsumer.java @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.example.kafka.consumers; + +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncListener; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.AsyncOperation; +import io.github.stavshamir.springwolf.asyncapi.scanners.channels.operationdata.annotation.KafkaAsyncOperationBinding; +import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class AvroConsumer { + @KafkaListener( + topics = "avro-topic", + properties = { + "specific.avro.reader=true", + "schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL:http://localhost:8081}", + "key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer", + "value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer", + "spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer", + "spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer" + }) + @AsyncListener( + operation = + @AsyncOperation( + channelName = "avro-topic", + description = + "Requires a running kafka-schema-registry. See docker-compose.yml to start it")) + @KafkaAsyncOperationBinding + public void receiveExampleAvroPayload(ExamplePayloadAvroDto payloads) { + log.info("Received new message in avro-topic: {}", payloads.toString()); + } +} diff --git a/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/ProducerSystemTest.java b/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/ProducerSystemTest.java index fc46ec712..8836f160c 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/ProducerSystemTest.java +++ b/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/ProducerSystemTest.java @@ -2,11 +2,15 @@ package io.github.stavshamir.springwolf.example.kafka; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties; +import io.github.stavshamir.springwolf.example.kafka.consumers.AvroConsumer; import io.github.stavshamir.springwolf.example.kafka.consumers.ExampleConsumer; +import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExampleEnum; +import io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto; import io.github.stavshamir.springwolf.example.kafka.dtos.ExamplePayloadDto; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; @@ -50,6 +54,9 @@ public class ProducerSystemTest { @SpyBean ExampleConsumer exampleConsumer; + @SpyBean + AvroConsumer avroConsumer; + @Autowired SpringwolfKafkaConfigProperties properties; @@ -82,4 +89,18 @@ void producerCanUseSpringwolfConfigurationToSendMessage() { // then verify(exampleConsumer, timeout(10000)).receiveExamplePayload(payload); } + + @Test + @Order(3) + @Disabled("because it requires a running kafka-schema-registry instance (docker image= ~1GB).") + void producerCanUseSpringwolfConfigurationToSendAvroMessage() { + // given + ExamplePayloadAvroDto payload = new ExamplePayloadAvroDto("foo", 5, ExampleEnum.FOO1); + + // when + springwolfKafkaProducer.send("avro-topic", "key", Map.of(), payload); + + // then + verify(avroConsumer, timeout(10000)).receiveExampleAvroPayload(payload); + } } diff --git a/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/SpringContextIntegrationTest.java b/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/SpringContextIntegrationTest.java index 67f22580d..9d0274d23 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/SpringContextIntegrationTest.java +++ b/springwolf-examples/springwolf-kafka-example/src/test/java/io/github/stavshamir/springwolf/example/kafka/SpringContextIntegrationTest.java @@ -48,7 +48,7 @@ void testContextWithApplicationProperties() { @Test void testAllChannelsAreFound() { - assertThat(asyncApiService.getAsyncAPI().getChannels()).hasSize(4); + assertThat(asyncApiService.getAsyncAPI().getChannels()).hasSize(5); } } diff --git a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json index 51d971788..273d368cf 100644 --- a/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-kafka-example/src/test/resources/asyncapi.json @@ -60,6 +60,33 @@ } } }, + "avro-topic": { + "publish": { + "operationId": "avro-topic_publish", + "description": "Requires a running kafka-schema-registry. See docker-compose.yml to start it", + "bindings": { + "kafka": { + "bindingVersion": "0.4.0" + } + }, + "message": { + "schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0", + "name": "io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto", + "title": "ExamplePayloadAvroDto", + "payload": { + "$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto" + }, + "headers": { + "$ref": "#/components/schemas/HeadersNotDocumented" + }, + "bindings": { + "kafka": { + "bindingVersion": "0.4.0" + } + } + } + } + }, "example-topic": { "publish": { "operationId": "example-topic_publish_receiveExamplePayload", @@ -488,6 +515,56 @@ "type": "object" } }, + "io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto": { + "type": "object", + "properties": { + "someEnum": { + "type": "string", + "enum": [ + "FOO1", + "FOO2", + "FOO3" + ] + }, + "someLong": { + "type": "integer", + "format": "int32" + }, + "someString": { + "type": "string" + } + }, + "example": { + "someEnum": "FOO1", + "someLong": 0, + "someString": "string" + }, + "x-json-schema": { + "$schema": "https://json-schema.org/draft-04/schema#", + "name": "io.github.stavshamir.springwolf.example.kafka.dto.avro.ExamplePayloadAvroDto", + "properties": { + "someEnum": { + "enum": [ + "FOO1", + "FOO2", + "FOO3" + ], + "name": "someEnum", + "type": "string" + }, + "someLong": { + "format": "int32", + "name": "someLong", + "type": "integer" + }, + "someString": { + "name": "someString", + "type": "string" + } + }, + "type": "object" + } + }, "io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto": { "required": [ "example" diff --git a/springwolf-examples/springwolf-kafka-example/zookeeper_jaas.conf b/springwolf-examples/springwolf-kafka-example/zookeeper_jaas.conf deleted file mode 100644 index ebf7fc033..000000000 --- a/springwolf-examples/springwolf-kafka-example/zookeeper_jaas.conf +++ /dev/null @@ -1,5 +0,0 @@ -Server { - org.apache.zookeeper.server.auth.DigestLoginModule required - user_super="adminsecret" - user_kafka="kafkasecret"; -}; diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java index df9d2d366..f45dc1f91 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java @@ -6,6 +6,7 @@ import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties; +import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -31,8 +32,8 @@ public SpringwolfKafkaController springwolfKafkaController( @Bean @ConditionalOnMissingBean public SpringwolfKafkaProducer springwolfKafkaProducer( - SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties) { - return new SpringwolfKafkaProducer(springwolfKafkaTemplateFromProperties); + SpringwolfKafkaTemplateProvider springwolfKafkaTemplateProvider) { + return new SpringwolfKafkaProducer(springwolfKafkaTemplateProvider); } @Bean