This is a wrapper around asyncronous facilities provided by the official Apache Pulsar Java Client using Reactor Core interfaces.
<dependency>
<groupId>com.rpuch.pulsar-reactive-client</groupId>
<artifactId>pulsar-client-reactor</artifactId>
<version>1.1.0</version>
</dependency>
implementation 'com.rpuch.pulsar-reactive-client:pulsar-client-reactor:1.1.0'
PulsarClient coreClient = PulsarClient.builder().serviceUrl(pulsarBrokerUrl).build();
ReactivePulsarClient client = ReactivePulsarClient.from(coreClient);
MessageId messageId = client.newProducer()
.topic("my-topic")
.forOne(producer -> producer.send("Hello!".bytes()))
.block();
Consume an infinite stream of messaging acknowledging each after processing it starting at the very beginning of a topic
client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.forMany(consumer -> consumer.messages().concatMap(message -> {
String str = new String(msg.getData());
System.out.println(str);
return consumer.acknowledge(message);
}))
.subscribe();
Flux<Message<byte[]>> messages = client.newReader()
.topic("my-topic")
.startMessageId(MessageId.earliest)
.messages();
messages.map(msg -> new String(msg.getData())).subscribe(System.out::println);
- Support for fast message publishing, batches and chunked messages
- Support for transactions
- Addition of support for
RxJava
and alternatives Reactive Streams implementations (like Mutiny) is under consideration