Skip to content

Commit

Permalink
fix: don't use overlapping session ids in MqttFlowTest
Browse files Browse the repository at this point in the history
While looking into apache#468, I noticed the two failing tests were
sharing the same session id, which reminded of apache#456.

While in this case the two tests aren't sharing the same
session, and I haven't investigated the details of this
codebase further, I'm curious to see if the problem remains
when we use unique session ids.
  • Loading branch information
raboof committed Jul 7, 2024
1 parent 4086be1 commit 01e48e0
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic()
throws InterruptedException, ExecutionException, TimeoutException {
String clientId = "source-spec/flow";
String topic = "source-spec/topic1";
ByteString uniqueSessionId = ByteString.fromString("establishClientBidirectionalConnectionAndSubscribeToATopic-session");

// #create-streaming-flow
MqttSessionSettings settings = MqttSessionSettings.create();
Expand All @@ -111,7 +112,7 @@ public void establishClientBidirectionalConnectionAndSubscribeToATopic()
Tcp.get(system).outgoingConnection("localhost", 1883);

Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
Mqtt.clientSessionFlow(session, ByteString.fromString("1")).join(connection);
Mqtt.clientSessionFlow(session, uniqueSessionId).join(connection);
// #create-streaming-flow

// #run-streaming-flow
Expand Down Expand Up @@ -159,6 +160,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic()
throws InterruptedException, ExecutionException, TimeoutException {
String clientId = "flow-spec/flow";
String topic = "source-spec/topic1";
ByteString uniqueSessionId = ByteString.fromString("establishServerBidirectionalConnectionAndSubscribeToATopic-connection");
String host = "localhost";
int port = 9884;

Expand Down Expand Up @@ -251,7 +253,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic()
MqttClientSession clientSession = new ActorMqttClientSession(settings, system);

Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
Mqtt.clientSessionFlow(clientSession, ByteString.fromString("1")).join(connection);
Mqtt.clientSessionFlow(clientSession, uniqueSessionId).join(connection);

Pair<SourceQueueWithComplete<Command<Object>>, CompletionStage<Publish>> run =
Source.<Command<Object>>queue(3, OverflowStrategy.fail())
Expand Down

0 comments on commit 01e48e0

Please sign in to comment.