Skip to content

Commit

Permalink
Merge pull request #217 from quarkiverse/feature/stream-configuration
Browse files Browse the repository at this point in the history
Fixed update of stream configurations
  • Loading branch information
kjeldpaw authored Oct 24, 2024
2 parents e071948 + bd4f689 commit 8800cb3
Show file tree
Hide file tree
Showing 29 changed files with 1,177 additions and 327 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "3.15.8"
current-version: "3.15.9"
next-version: "3.16.0-SNAPSHOT"

Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamConnector;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamRecorder;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultMessageMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.DefaultPayloadMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapperImpl;
import io.quarkiverse.reactive.messaging.nats.jetstream.tracing.JetStreamInstrument;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
Expand Down Expand Up @@ -53,7 +55,7 @@ void createNatsConnector(BuildProducer<AdditionalBeanBuildItem> buildProducer) {
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamConnector.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(JetStreamInstrument.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ExecutionHolder.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConnectionFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultConnectionFactory.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultPayloadMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(DefaultMessageMapper.class));
buildProducer.produce(AdditionalBeanBuildItem.unremovableOf(ConsumerMapperImpl.class));
Expand All @@ -62,10 +64,11 @@ void createNatsConnector(BuildProducer<AdditionalBeanBuildItem> buildProducer) {

@BuildStep
@Record(RUNTIME_INIT)
@Consume(SyntheticBeansRuntimeInitBuildItem.class)
public void configureJetStream(JetStreamRecorder recorder,
JetStreamBuildConfiguration buildConfig) {
if (buildConfig.autoConfigure()) {
recorder.setupStreams();
recorder.setup();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package io.quarkiverse.reactive.messaging.nats.jetstream.test;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.nats.client.api.CompressionOption;
import io.nats.client.api.DiscardPolicy;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StorageType;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DefaultConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamStatus;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkus.test.QuarkusUnitTest;

public class StreamSetupTest {

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.withConfigurationResource("application-stream.properties");

@Inject
NatsConfiguration natsConfiguration;

@Inject
ConnectionFactory connectionFactory;

@Test
void updateConfiguration() throws Exception {
try (final var connection = connectionFactory.create(ConnectionConfiguration.of(natsConfiguration),
new DefaultConnectionListener()).await().atMost(Duration.ofSeconds(30))) {

final var currentConfiguration = connection.getStreamConfiguration("stream-test").await()
.atMost(Duration.ofSeconds(30));
assertThat(currentConfiguration).isNotNull();
assertThat(currentConfiguration.subjects()).containsExactly("stream-data");
assertThat(currentConfiguration.storageType()).isEqualTo(StorageType.File);
assertThat(currentConfiguration.retentionPolicy()).isEqualTo(RetentionPolicy.Interest);

final var notModifiedResult = connection
.addStreams(List.of(StreamSetupConfiguration.builder()
.configuration(update(Set.of("stream-data"), RetentionPolicy.Interest)).overwrite(false).build()))
.await().atMost(Duration.ofSeconds(30));
assertThat(notModifiedResult).hasSize(1);
assertThat(notModifiedResult.get(0).status()).isEqualTo(StreamStatus.NotModified);

final var notModifiedConfiguration = connection.getStreamConfiguration("stream-test").await()
.atMost(Duration.ofSeconds(30));
assertThat(notModifiedConfiguration).isNotNull();
assertThat(notModifiedConfiguration.subjects()).containsExactly("stream-data");
assertThat(notModifiedConfiguration.storageType()).isEqualTo(StorageType.File);
assertThat(notModifiedConfiguration.retentionPolicy()).isEqualTo(RetentionPolicy.Interest);

final var updatedSubjectsResult = connection
.addStreams(List.of(StreamSetupConfiguration.builder()
.configuration(update(Set.of("stream-data", "stream-data-2"), RetentionPolicy.Interest))
.overwrite(false).build()))
.await().atMost(Duration.ofSeconds(30));
assertThat(updatedSubjectsResult).hasSize(1);
assertThat(updatedSubjectsResult.get(0).status()).isEqualTo(StreamStatus.Updated);

final var updatedSubjectsConfiguration = connection.getStreamConfiguration("stream-test").await()
.atMost(Duration.ofSeconds(30));
assertThat(updatedSubjectsConfiguration).isNotNull();
assertThat(updatedSubjectsConfiguration.subjects()).containsExactlyInAnyOrder("stream-data", "stream-data-2");

final var updatedRetentionPolicyResult = connection
.addStreams(List.of(StreamSetupConfiguration.builder()
.configuration(update(Set.of("stream-data", "stream-data-2"), RetentionPolicy.WorkQueue))
.overwrite(true).build()))
.await().atMost(Duration.ofSeconds(30));
assertThat(updatedRetentionPolicyResult).hasSize(1);
assertThat(updatedRetentionPolicyResult.get(0).status()).isEqualTo(StreamStatus.Created);

final var retentionPolicyConfiguration = connection.getStreamConfiguration("stream-test").await()
.atMost(Duration.ofSeconds(30));
assertThat(retentionPolicyConfiguration).isNotNull();
assertThat(retentionPolicyConfiguration.subjects()).containsExactlyInAnyOrder("stream-data", "stream-data-2");
assertThat(retentionPolicyConfiguration.storageType()).isEqualTo(StorageType.File);
assertThat(retentionPolicyConfiguration.retentionPolicy()).isEqualTo(RetentionPolicy.WorkQueue);
}
}

private StreamConfiguration update(final Set<String> subjects, final RetentionPolicy retentionPolicy) {
return new StreamConfiguration() {

@Override
public String name() {
return "stream-test";
}

@Override
public Optional<String> description() {
return Optional.empty();
}

@Override
public Set<String> subjects() {
return subjects;
}

@Override
public Integer replicas() {
return 1;
}

@Override
public StorageType storageType() {
return StorageType.File;
}

@Override
public RetentionPolicy retentionPolicy() {
return retentionPolicy;
}

@Override
public CompressionOption compressionOption() {
return CompressionOption.None;
}

@Override
public Optional<Long> maximumConsumers() {
return Optional.empty();
}

@Override
public Optional<Long> maximumMessages() {
return Optional.empty();
}

@Override
public Optional<Long> maximumMessagesPerSubject() {
return Optional.empty();
}

@Override
public Optional<Long> maximumBytes() {
return Optional.empty();
}

@Override
public Optional<Duration> maximumAge() {
return Optional.empty();
}

@Override
public Optional<Integer> maximumMessageSize() {
return Optional.empty();
}

@Override
public Optional<String> templateOwner() {
return Optional.empty();
}

@Override
public Optional<DiscardPolicy> discardPolicy() {
return Optional.empty();
}

@Override
public Optional<Duration> duplicateWindow() {
return Optional.empty();
}

@Override
public Optional<Boolean> allowRollup() {
return Optional.empty();
}

@Override
public Optional<Boolean> allowDirect() {
return Optional.empty();
}

@Override
public Optional<Boolean> mirrorDirect() {
return Optional.empty();
}

@Override
public Optional<Boolean> denyDelete() {
return Optional.empty();
}

@Override
public Optional<Boolean> denyPurge() {
return Optional.empty();
}

@Override
public Optional<Boolean> discardNewPerSubject() {
return Optional.empty();
}

@Override
public Optional<Long> firstSequence() {
return Optional.empty();
}
};
}
}
2 changes: 2 additions & 0 deletions deployment/src/test/resources/application-stream.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quarkus.messaging.nats.jet-stream.streams[0].name=stream-test
quarkus.messaging.nats.jet-stream.streams[0].subjects[0]=stream-data
2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 3.15.8
:project-version: 3.15.9

:examples-dir: ./../examples/
19 changes: 19 additions & 0 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ quarkus.messaging.nats.jet-stream.streams[i].storage-type,The storage type for s
quarkus.messaging.nats.jet-stream.streams[i].retention-policy,Declares the retention policy for the stream (Limits or Interest)
quarkus.messaging.nats.jet-stream.streams[i].name,Name of the stream to setup if auto-configure is enabled
quarkus.messaging.nats.jet-stream.streams[i].subjects[n],Name of the subject in stream to setup if auto-configure is enabled
quarkus.messaging.nats.jet-stream.streams[i].overwrite,If the stream already exists it will be overwritten
quarkus.messaging.nats.jet-stream.streams[i].description,Description of stream
quarkus.messaging.nats.jet-stream.streams[i].compression-option,The compression option for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-consumers,The maximum number of consumers for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-messages,The maximum messages for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-messages-per-subject,The maximum messages per subject for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-bytes,The maximum number of bytes for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-age,The maximum message age for this stream
quarkus.messaging.nats.jet-stream.streams[i].maximum-message-size,The maximum message size for this stream
quarkus.messaging.nats.jet-stream.streams[i].template-owner,The template json for this stream
quarkus.messaging.nats.jet-stream.streams[i].discard-policy,The discard policy for this stream
quarkus.messaging.nats.jet-stream.streams[i].duplicate-window,The duplicate checking window stream configuration. Duration. ZERO means duplicate checking is not enabled
quarkus.messaging.nats.jet-stream.streams[i].allow-rollup,The flag indicating if the stream allows rollup
quarkus.messaging.nats.jet-stream.streams[i].allow-direct,The flag indicating if the stream allows direct message access
quarkus.messaging.nats.jet-stream.streams[i].mirror-direct,The flag indicating if the stream allows higher performance and unified direct access for mirrors as well
quarkus.messaging.nats.jet-stream.streams[i].deny-delete,The flag indicating if deny delete is set for the stream
quarkus.messaging.nats.jet-stream.streams[i].deny-purge,The flag indicating if deny purge is set for the stream
quarkus.messaging.nats.jet-stream.streams[i].discard-new-per-subject,Whether discard policy with max message per subject is applied per subject
quarkus.messaging.nats.jet-stream.streams[i].first-sequence,The first sequence used in the stream
quarkus.messaging.nats.jet-stream.key-value-stores[i].bucket-name,Name of Key-Value store
quarkus.messaging.nats.jet-stream.key-value-stores[i].description,Description of Key-Value store
quarkus.messaging.nats.jet-stream.key-value-stores[i].storage-type,The storage type (File or Memory)
Expand Down
5 changes: 1 addition & 4 deletions runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-semconv</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
Expand All @@ -55,6 +51,7 @@
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
Expand Down
Loading

0 comments on commit 8800cb3

Please sign in to comment.