From 9af284cbe1bc13a23dc8eafffa0946eb2353f97a Mon Sep 17 00:00:00 2001 From: yvgopal Date: Tue, 14 Mar 2017 13:05:49 -0700 Subject: [PATCH] Sessions support (#25) * Chaning directory structure to align maven standard directory structure. Also changes to pom.xml files to align Maven standard build process like running tests. * Removing an unwanted file that was accidentally checked in. * Implemented Peek and PeekBatch functionality. Added some tests. * Implementation of ReceiveBySequenceNumber and Complete,Abandon,Defer,DeadLetter of messages received by sequence numbers. * WIP Sessions support * Full implementation of AcceptSession, receive features on sessions, renew lock, getState and set state. * Implementation of browse sessions. * Adding tests for topic-subscription cases --- .../servicebus/samples/ReceiveSample.java | 2 +- .../resources/access.properties.template | 27 +- .../servicebus/BrokeredMessageBrowser.java | 183 ------ .../servicebus/BrokeredMessageReceiver.java | 177 +++++- .../servicebus/BrokeredMessageSender.java | 8 +- .../servicebus/BrokeredMessageSession.java | 89 +++ .../servicebus/BrowsableMessageSession.java | 147 +++++ .../azure/servicebus/ClientFactory.java | 60 +- .../azure/servicebus/IMessageReceiver.java | 2 +- .../azure/servicebus/IMessageSession.java | 12 +- .../azure/servicebus/SessionBrowser.java | 53 ++ .../primitives/ClientConstants.java | 17 +- .../primitives/ConnectionStringBuilder.java | 10 +- .../servicebus/primitives/MessageBrowser.java | 102 ---- .../primitives/MessageReceiver.java | 305 +++++++++- .../servicebus/primitives/MessageSender.java | 8 +- .../primitives/MessagingFactory.java | 35 +- .../primitives/RequestResponseLink.java | 9 +- .../primitives/RequestResponseUtils.java | 8 +- .../azure/servicebus/primitives/Util.java | 10 + .../servicebus/QueueSendReceiveTests.java | 505 +--------------- .../azure/servicebus/QueueSessionTests.java | 16 + .../azure/servicebus/SendReceiveTests.java | 177 ++++++ .../azure/servicebus/SessionTests.java | 317 ++++++++++ .../azure/servicebus/TestCommons.java | 544 ++++++++++++++++++ .../microsoft/azure/servicebus/TestUtils.java | 70 ++- .../servicebus/TopicSendReceiveTests.java | 15 + .../azure/servicebus/TopicSessionTests.java | 16 + .../TestConnectionStringBuilder.java | 21 + 29 files changed, 2043 insertions(+), 902 deletions(-) delete mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageBrowser.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSession.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java delete mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageBrowser.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSendReceiveTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/TestConnectionStringBuilder.java diff --git a/azure-servicebus-samples/src/main/java/com/microsoft/azure/servicebus/samples/ReceiveSample.java b/azure-servicebus-samples/src/main/java/com/microsoft/azure/servicebus/samples/ReceiveSample.java index ed9b7bcf92d05..9fef7b20ad238 100644 --- a/azure-servicebus-samples/src/main/java/com/microsoft/azure/servicebus/samples/ReceiveSample.java +++ b/azure-servicebus-samples/src/main/java/com/microsoft/azure/servicebus/samples/ReceiveSample.java @@ -34,7 +34,7 @@ private static void receiveMessages() throws InterruptedException, ServiceBusExc while (true) { IBrokeredMessage receivedMessage = receiver.receive(Duration.ofMinutes(1)); System.out.println(new String(receivedMessage.getContent())); - receiver.complete(receivedMessage); + receiver.complete(receivedMessage.getLockToken()); } } diff --git a/azure-servicebus/resources/access.properties.template b/azure-servicebus/resources/access.properties.template index 31151889611e0..076f9c1bb58cb 100644 --- a/azure-servicebus/resources/access.properties.template +++ b/azure-servicebus/resources/access.properties.template @@ -1,4 +1,23 @@ -namespacename=yournamespace -entitypath=yourentitypath -sharedaccesskeyname=yoursharedaccesskey -sharedaccesskey=yoursharedaccesskey +# non-partitioned queue with no sessions +queue.namespacename=yournamespace +queue.entitypath=yourentitypath +queue.sharedaccesskeyname=yoursharedaccesskey +queue.sharedaccesskey=yoursharedaccesskey +# non-partitioned queue with sessions +queue.sessionful.namespacename=yournamespace +queue.sessionful.entitypath=yourentitypath +queue.sessionful.sharedaccesskeyname=yoursharedaccesskey +queue.sessionful.sharedaccesskey=yoursharedaccesskey +# non-partitioned topic/subscription with no sessions +topic.namespacename=yournamespace +topic.entitypath=yourentitypath +subscription.entitypath=yoursubscriptionpath +topic.sharedaccesskeyname=yoursharedaccesskey +topic.sharedaccesskey=yoursharedaccesskey +# non-partitioned topic/subscription with sessions +topic.sessionful.namespacename=yournamespace +topic.sessionful.entitypath=yourentitypath +subscription.sessionful.entitypath=yoursubscriptionpath +topic.sessionful.sharedaccesskeyname=yoursharedaccesskey +topic.sessionful.sharedaccesskey=yoursharedaccesskey + diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageBrowser.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageBrowser.java deleted file mode 100644 index f9614998b7fdd..0000000000000 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageBrowser.java +++ /dev/null @@ -1,183 +0,0 @@ -package com.microsoft.azure.servicebus; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; - -import org.apache.qpid.proton.message.Message; - -import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.primitives.MessageBrowser; -import com.microsoft.azure.servicebus.primitives.MessagingFactory; -import com.microsoft.azure.servicebus.primitives.ServiceBusException; -import com.microsoft.azure.servicebus.primitives.StringUtil; - -public class BrokeredMessageBrowser extends InitializableEntity implements IMessageBrowser -{ - private ConnectionStringBuilder amqpConnectionStringBuilder = null; - private String entityPath = null; - private MessagingFactory messagingFactory = null; - private boolean ownsMessagingFactory; - private boolean isInitialized = false; - private MessageBrowser internalBrowser = null; - private long lastPeekedSequenceNumber = 0; - private String sessionId = null; - - private BrokeredMessageBrowser() - { - super(StringUtil.getRandomString(), null); - } - - BrokeredMessageBrowser(ConnectionStringBuilder amqpConnectionStringBuilder) - { - this(); - - this.amqpConnectionStringBuilder = amqpConnectionStringBuilder; - this.entityPath = this.amqpConnectionStringBuilder.getEntityPath(); - this.ownsMessagingFactory = true; - } - - BrokeredMessageBrowser(MessagingFactory messagingFactory, String entityPath) - { - this(messagingFactory, entityPath, false); - } - - private BrokeredMessageBrowser(MessagingFactory messagingFactory, String entityPath, boolean ownsMessagingFactory) - { - this(); - - this.messagingFactory = messagingFactory; - this.entityPath = entityPath; - this.ownsMessagingFactory = ownsMessagingFactory; - } - - @Override - public String getEntityPath() { - return this.entityPath; - } - - @Override - synchronized CompletableFuture initializeAsync() throws IOException { - if(this.isInitialized) - { - return CompletableFuture.completedFuture(null); - } - else - { - CompletableFuture factoryFuture; - if(this.messagingFactory == null) - { - factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder).thenAccept((f) -> {BrokeredMessageBrowser.this.messagingFactory = f;}); - } - else - { - factoryFuture = CompletableFuture.completedFuture(null); - } - - return factoryFuture.thenCompose((v) -> - { - CompletableFuture browserFuture = MessageBrowser.create(BrokeredMessageBrowser.this.messagingFactory, StringUtil.getRandomString(), BrokeredMessageBrowser.this.entityPath); - return browserFuture.thenAccept((b) -> - { - BrokeredMessageBrowser.this.internalBrowser = b; - BrokeredMessageBrowser.this.isInitialized = true; - }); - }); - } - } - - @Override - protected CompletableFuture onClose() { - if(this.isInitialized) - { - return this.internalBrowser.closeAsync().thenComposeAsync((v) -> - { - if(BrokeredMessageBrowser.this.ownsMessagingFactory) - { - return BrokeredMessageBrowser.this.messagingFactory.closeAsync(); - } - else - { - return CompletableFuture.completedFuture(null); - } - }); - } - else - { - return CompletableFuture.completedFuture(null); - } - } - - @Override - public IBrokeredMessage peek() throws InterruptedException, ServiceBusException { - return Utils.completeFuture(this.peekAsync()); - } - - @Override - public IBrokeredMessage peek(long fromSequenceNumber) throws InterruptedException, ServiceBusException { - return Utils.completeFuture(this.peekAsync(fromSequenceNumber)); - } - - @Override - public Collection peekBatch(int messageCount) throws InterruptedException, ServiceBusException { - return Utils.completeFuture(this.peekBatchAsync(messageCount)); - } - - @Override - public Collection peekBatch(long fromSequenceNumber, int messageCount) throws InterruptedException, ServiceBusException { - return Utils.completeFuture(this.peekBatchAsync(fromSequenceNumber, messageCount)); - } - - @Override - public CompletableFuture peekAsync() { - return this.peekAsync(this.lastPeekedSequenceNumber + 1); - } - - @Override - public CompletableFuture peekAsync(long fromSequenceNumber) { - return this.peekBatchAsync(fromSequenceNumber, 1).thenApply((c) -> - { - IBrokeredMessage message = null; - Iterator iterator = c.iterator(); - if(iterator.hasNext()) - { - message = iterator.next(); - iterator.remove(); - } - return message; - }); - } - - @Override - public CompletableFuture> peekBatchAsync(int messageCount) { - return this.peekBatchAsync(this.lastPeekedSequenceNumber + 1, messageCount); - } - - @Override - public CompletableFuture> peekBatchAsync(long fromSequenceNumber, int messageCount) { - CompletableFuture> peekFuture = this.internalBrowser.peekMessages(fromSequenceNumber, messageCount, this.sessionId, this.messagingFactory.getOperationTimeout()); - return peekFuture.thenApply((peekedMessages) -> - { - ArrayList convertedMessages = new ArrayList(); - if(peekedMessages != null) - { - long sequenceNumberOfLastMessage = 0; - for(Message message : peekedMessages) - { - BrokeredMessage convertedMessage = MessageConverter.convertAmqpMessageToBrokeredMessage(message); - sequenceNumberOfLastMessage = convertedMessage.getSequenceNumber(); - convertedMessages.add(convertedMessage); - } - - if(sequenceNumberOfLastMessage > 0) - { - this.lastPeekedSequenceNumber = sequenceNumberOfLastMessage; - } - } - - return convertedMessages; - }); - } -} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageReceiver.java index 562992fcde194..039dd559565a4 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageReceiver.java @@ -14,6 +14,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.message.Message; import com.microsoft.azure.servicebus.primitives.ClientConstants; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; @@ -28,28 +29,38 @@ import com.microsoft.azure.servicebus.primitives.TimerType; import com.microsoft.azure.servicebus.primitives.Util; - // TODO As part of receive, don't return messages whose lock is already expired. Can happen because of delay between prefetch and actual receive from client. -class BrokeredMessageReceiver extends InitializableEntity implements IMessageReceiver +class BrokeredMessageReceiver extends InitializableEntity implements IMessageReceiver, IMessageBrowser { private static final int DEFAULT_PREFETCH_COUNT = 100; private final ReceiveMode receiveMode; private boolean ownsMessagingFactory; + private boolean ownsInternalReceiver; private ConnectionStringBuilder amqpConnectionStringBuilder = null; private String entityPath = null; private MessagingFactory messagingFactory = null; private MessageReceiver internalReceiver = null; private boolean isInitialized = false; private int prefetchCount = DEFAULT_PREFETCH_COUNT; + private long lastPeekedSequenceNumber = 0; private final ConcurrentHashMap requestResponseLockTokensToLockTimesMap; private BrokeredMessageReceiver(ReceiveMode receiveMode) { super(StringUtil.getRandomString(), null); this.receiveMode = receiveMode; - this.requestResponseLockTokensToLockTimesMap = new ConcurrentHashMap<>(); - this.schedulePruningRequestResponseLockTokens(); + this.ownsInternalReceiver = true; + this.requestResponseLockTokensToLockTimesMap = new ConcurrentHashMap<>(); + } + + private BrokeredMessageReceiver(MessagingFactory messagingFactory, String entityPath, boolean ownsMessagingFactory, ReceiveMode receiveMode) + { + this(receiveMode); + + this.messagingFactory = messagingFactory; + this.entityPath = entityPath; + this.ownsMessagingFactory = ownsMessagingFactory; } BrokeredMessageReceiver(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) @@ -64,16 +75,15 @@ private BrokeredMessageReceiver(ReceiveMode receiveMode) BrokeredMessageReceiver(MessagingFactory messagingFactory, String entityPath, ReceiveMode receiveMode) { this(messagingFactory, entityPath, false, receiveMode); - } - - private BrokeredMessageReceiver(MessagingFactory messagingFactory, String entityPath, boolean ownsMessagingFactory, ReceiveMode receiveMode) + } + + // Only to be used by browsable sessions + BrokeredMessageReceiver(MessagingFactory messagingFactory, MessageReceiver internalReceiver, String entityPath, ReceiveMode receiveMode) { - this(receiveMode); - - this.messagingFactory = messagingFactory; - this.entityPath = entityPath; - this.ownsMessagingFactory = ownsMessagingFactory; - } + this(messagingFactory, entityPath, false, receiveMode); + this.internalReceiver = internalReceiver; + this.ownsInternalReceiver = false; + } @Override synchronized CompletableFuture initializeAsync() throws IOException @@ -87,25 +97,62 @@ synchronized CompletableFuture initializeAsync() throws IOException CompletableFuture factoryFuture; if(this.messagingFactory == null) { - factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder).thenAccept((f) -> {BrokeredMessageReceiver.this.messagingFactory = f;}); + factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder).thenAccept((f) -> {this.messagingFactory = f;}); } else { factoryFuture = CompletableFuture.completedFuture(null); - } + } return factoryFuture.thenCompose((v) -> { - CompletableFuture receiverFuture = MessageReceiver.create(BrokeredMessageReceiver.this.messagingFactory, StringUtil.getRandomString(), BrokeredMessageReceiver.this.entityPath, this.prefetchCount, getSettleModePairForRecevieMode(this.receiveMode)); - return receiverFuture.thenAccept((r) -> + CompletableFuture acceptReceiverFuture; + if(this.internalReceiver == null) + { + CompletableFuture receiverFuture; + if(BrokeredMessageReceiver.this.isSessionReceiver()) + { + receiverFuture = MessageReceiver.create(this.messagingFactory, StringUtil.getRandomString(), this.entityPath, this.getRequestedSessionId(), false, this.prefetchCount, getSettleModePairForRecevieMode(this.receiveMode)); + } + else + { + receiverFuture = MessageReceiver.create(this.messagingFactory, StringUtil.getRandomString(), this.entityPath, this.prefetchCount, getSettleModePairForRecevieMode(this.receiveMode)); + } + + acceptReceiverFuture = receiverFuture.thenAccept((r) -> + { + this.internalReceiver = r; + }); + } + else { - BrokeredMessageReceiver.this.internalReceiver = r; - BrokeredMessageReceiver.this.isInitialized = true; + acceptReceiverFuture = CompletableFuture.completedFuture(null); + } + + return acceptReceiverFuture.thenRun(() -> + { + this.isInitialized = true; + this.schedulePruningRequestResponseLockTokens(); }); }); } } + protected boolean isSessionReceiver() + { + return false; + } + + protected String getRequestedSessionId() + { + return null; + } + + protected final MessageReceiver getInternalReceiver() + { + return this.internalReceiver; + } + @Override public String getEntityPath() { return this.entityPath; @@ -118,7 +165,7 @@ public ReceiveMode getReceiveMode() { @Override public void abandon(UUID lockToken) throws InterruptedException, ServiceBusException { - Utils.completeFuture(this.abandonAsync(lockToken)); + Utils.completeFuture(this.abandonAsync(lockToken)); } @Override @@ -381,7 +428,17 @@ else if (c.isEmpty()) protected CompletableFuture onClose() { if(this.isInitialized) { - return this.internalReceiver.closeAsync().thenComposeAsync((v) -> + CompletableFuture closeReceiverFuture; + if(this.ownsInternalReceiver) + { + closeReceiverFuture = this.internalReceiver.closeAsync(); + } + else + { + closeReceiverFuture = CompletableFuture.completedFuture(null); + } + + return closeReceiverFuture.thenComposeAsync((v) -> { if(BrokeredMessageReceiver.this.ownsMessagingFactory) { @@ -509,7 +566,7 @@ public CompletableFuture> renewMessageLockBatchAsync(Collect lockTokens[messageIndex++] = lockToken; } - return this.internalReceiver.renewMessageLocksAsync(lockTokens, null, this.messagingFactory.getOperationTimeout()).thenApply( + return this.internalReceiver.renewMessageLocksAsync(lockTokens).thenApply( (newLockedUntilTimes) -> { // Assuming both collections are of same size and in same order (order doesn't really matter as all instants in the response are same). @@ -520,9 +577,9 @@ public CompletableFuture> renewMessageLockBatchAsync(Collect BrokeredMessage message = (BrokeredMessage)messageIterator.next(); Instant lockedUntilUtc = lockTimeIterator.next(); message.setLockedUntilUtc(lockedUntilUtc); - if(BrokeredMessageReceiver.this.requestResponseLockTokensToLockTimesMap.containsKey(message.getLockToken())) + if(this.requestResponseLockTokensToLockTimesMap.containsKey(message.getLockToken())) { - BrokeredMessageReceiver.this.requestResponseLockTokensToLockTimesMap.put(message.getLockToken(), lockedUntilUtc); + this.requestResponseLockTokensToLockTimesMap.put(message.getLockToken(), lockedUntilUtc); } } return newLockedUntilTimes; @@ -540,6 +597,78 @@ public Collection renewMessageLockBatch(Collection peekBatch(int messageCount) throws InterruptedException, ServiceBusException { + return Utils.completeFuture(this.peekBatchAsync(messageCount)); + } + + @Override + public Collection peekBatch(long fromSequenceNumber, int messageCount) throws InterruptedException, ServiceBusException { + return Utils.completeFuture(this.peekBatchAsync(fromSequenceNumber, messageCount)); + } + + @Override + public CompletableFuture peekAsync() { + return this.peekAsync(this.lastPeekedSequenceNumber + 1); + } + + @Override + public CompletableFuture peekAsync(long fromSequenceNumber) { + return this.peekBatchAsync(fromSequenceNumber, 1).thenApply((c) -> + { + IBrokeredMessage message = null; + Iterator iterator = c.iterator(); + if(iterator.hasNext()) + { + message = iterator.next(); + iterator.remove(); + } + return message; + }); + } + + @Override + public CompletableFuture> peekBatchAsync(int messageCount) { + return this.peekBatchAsync(this.lastPeekedSequenceNumber + 1, messageCount); + } + + @Override + public CompletableFuture> peekBatchAsync(long fromSequenceNumber, int messageCount) { + String sessionId = this.isSessionReceiver()? this.internalReceiver.getSessionId() : null; + CompletableFuture> peekFuture = this.internalReceiver.peekMessagesAsync(fromSequenceNumber, messageCount, sessionId); + return peekFuture.thenApply((peekedMessages) -> + { + ArrayList convertedMessages = new ArrayList(); + if(peekedMessages != null) + { + long sequenceNumberOfLastMessage = 0; + for(Message message : peekedMessages) + { + BrokeredMessage convertedMessage = MessageConverter.convertAmqpMessageToBrokeredMessage(message); + sequenceNumberOfLastMessage = convertedMessage.getSequenceNumber(); + convertedMessages.add(convertedMessage); + } + + if(sequenceNumberOfLastMessage > 0) + { + this.lastPeekedSequenceNumber = sequenceNumberOfLastMessage; + } + } + + return convertedMessages; + }); + } + private void schedulePruningRequestResponseLockTokens() { // Run it every 1 hour diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSender.java index e1a2504f94fa0..aae04f6782928 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSender.java @@ -63,7 +63,7 @@ synchronized CompletableFuture initializeAsync() throws IOException CompletableFuture factoryFuture; if(this.messagingFactory == null) { - factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder).thenAccept((f) -> {BrokeredMessageSender.this.messagingFactory = f;}); + factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder).thenAccept((f) -> {this.messagingFactory = f;}); } else { @@ -72,11 +72,11 @@ synchronized CompletableFuture initializeAsync() throws IOException return factoryFuture.thenCompose((v) -> { - CompletableFuture senderFuture = MessageSender.create(BrokeredMessageSender.this.messagingFactory, StringUtil.getRandomString(), BrokeredMessageSender.this.entityPath); + CompletableFuture senderFuture = MessageSender.create(this.messagingFactory, StringUtil.getRandomString(), this.entityPath); return senderFuture.thenAccept((s) -> { - BrokeredMessageSender.this.internalSender = s; - BrokeredMessageSender.this.isInitialized = true; + this.internalSender = s; + this.isInitialized = true; }); }); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSession.java new file mode 100644 index 0000000000000..7e0ef0c150ff7 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrokeredMessageSession.java @@ -0,0 +1,89 @@ +package com.microsoft.azure.servicebus; + +import java.io.InputStream; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessageReceiver; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.StringUtil; + +public class BrokeredMessageSession extends BrokeredMessageReceiver implements IMessageSession +{ + private String requestedSessionId; + + BrokeredMessageSession(ConnectionStringBuilder amqpConnectionStringBuilder, String requestedSessionId, ReceiveMode receiveMode) + { + super(amqpConnectionStringBuilder, receiveMode); + this.requestedSessionId = requestedSessionId; + } + + BrokeredMessageSession(MessagingFactory messagingFactory, String entityPath, String requestedSessionId, ReceiveMode receiveMode) + { + super(messagingFactory, entityPath, receiveMode); + this.requestedSessionId = requestedSessionId; + } + + // Only to be used by browsable sessions + BrokeredMessageSession(MessagingFactory messagingFactory, MessageReceiver internalReceiver, String entityPath, ReceiveMode receiveMode) + { + super(messagingFactory, internalReceiver, entityPath, receiveMode); + this.requestedSessionId = null; + } + + @Override + protected final boolean isSessionReceiver() + { + return true; + } + + @Override + protected String getRequestedSessionId() + { + return this.requestedSessionId; + } + + @Override + public Instant getLockedUntilUtc() { + return this.getInternalReceiver().getSessionLockedUntilUtc(); + } + + @Override + public void renewLock() throws InterruptedException, ServiceBusException { + Utils.completeFuture(this.renewLockAsync()); + } + + @Override + public CompletableFuture renewLockAsync() { + return this.getInternalReceiver().renewSessionLocksAsync(); + } + + @Override + public void setState(byte[] sessionState) throws InterruptedException, ServiceBusException + { + Utils.completeFuture(this.setStateAsync(sessionState)); + } + + @Override + public CompletableFuture setStateAsync(byte[] sessionState) { + return this.getInternalReceiver().setSessionStateAsync(sessionState); + } + + @Override + public byte[] getState() throws InterruptedException, ServiceBusException { + return Utils.completeFuture(this.getStateAsync()); + } + + @Override + public CompletableFuture getStateAsync() { + return this.getInternalReceiver().getSessionStateAsync(); + } + + @Override + public String getSessionId() { + return this.getInternalReceiver().getSessionId(); + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java new file mode 100644 index 0000000000000..0053804149292 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/BrowsableMessageSession.java @@ -0,0 +1,147 @@ +package com.microsoft.azure.servicebus; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import com.microsoft.azure.servicebus.primitives.MessageReceiver; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; + +final class BrowsableMessageSession extends BrokeredMessageSession +{ + private static final String INVALID_OPERATION_ERROR_MESSAGE = "Unsupported operation on a browse only session."; + + private final String sessionId; + + BrowsableMessageSession(String sessionId, MessagingFactory messagingFactory, MessageReceiver internalReceiver, String entityPath) + { + super(messagingFactory, internalReceiver, entityPath, ReceiveMode.PeekLock); + this.sessionId = sessionId; + try { + this.initializeAsync().get(); + } catch (InterruptedException | ExecutionException | IOException e) { + // We can ignore it, as init is a no-operation in this case + } + } + + @Override + public String getSessionId() + { + return this.sessionId; + } + + @Override + public Instant getLockedUntilUtc() { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public int getPrefetchCount() + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public void setPrefetchCount(int prefetchCount) throws ServiceBusException + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture setStateAsync(byte[] sessionState) { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture renewLockAsync() { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public ReceiveMode getReceiveMode() { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture abandonAsync(UUID lockToken, Map propertiesToModify) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture completeAsync(UUID lockToken) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture completeBatchAsync(Collection messages) { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture deadLetterAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map propertiesToModify) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture deferAsync(UUID lockToken, Map propertiesToModify) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture receiveAsync() + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture receiveAsync(Duration serverWaitTime) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture receiveAsync(long sequenceNumber) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture> receiveBatchAsync(int maxMessageCount) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture> receiveBatchAsync(int maxMessageCount, Duration serverWaitTime) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture> receiveBatchAsync(Collection sequenceNumbers) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture renewMessageLockAsync(IBrokeredMessage message) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } + + @Override + public CompletableFuture> renewMessageLockBatchAsync(Collection messages) + { + throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE); + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java index a329f2b554380..cf0c0b0af913e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientFactory.java @@ -116,40 +116,70 @@ public static CompletableFuture createMessageReceiverFromFromE return receiver.initializeAsync().thenApply((v) -> receiver); } - // Create browser - public static IMessageBrowser createMessageBrowserFromConnectionString(String amqpConnectionString) throws InterruptedException, ServiceBusException, IOException + // Accept Session + public static IMessageSession acceptSessionFromConnectionString(String amqpConnectionString, String sessionId) throws InterruptedException, ServiceBusException, IOException { - return Utils.completeFuture(createMessageBrowserFromConnectionStringAsync(amqpConnectionString)); + return acceptSessionFromConnectionString(amqpConnectionString, sessionId, DEFAULTRECEIVEMODE); } - public static IMessageBrowser createMessageBrowserFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException, IOException + public static IMessageSession acceptSessionFromConnectionString(String amqpConnectionString, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException, IOException + { + return Utils.completeFuture(acceptSessionFromConnectionStringAsync(amqpConnectionString, sessionId, receiveMode)); + } + + public static IMessageSession acceptSessionFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, String sessionId) throws InterruptedException, ServiceBusException, IOException + { + return acceptSessionFromConnectionStringBuilder(amqpConnectionStringBuilder, sessionId, DEFAULTRECEIVEMODE); + } + + public static IMessageSession acceptSessionFromConnectionStringBuilder(ConnectionStringBuilder amqpConnectionStringBuilder, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException, IOException + { + return Utils.completeFuture(acceptSessionFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, sessionId, receiveMode)); + } + + public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId) throws InterruptedException, ServiceBusException, IOException { - return Utils.completeFuture(createMessageBrowserFromConnectionStringBuilderAsync(amqpConnectionStringBuilder)); + return acceptSessionFromEntityPath(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE); } - public static IMessageBrowser createMessageBrowserFromEntityPath(MessagingFactory messagingFactory, String entityPath) throws InterruptedException, ServiceBusException, IOException + public static IMessageSession acceptSessionFromEntityPath(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException, IOException { - return Utils.completeFuture(createMessageBrowserFromFromEntityPathAsync(messagingFactory, entityPath)); + return Utils.completeFuture(acceptSessionFromFromEntityPathAsync(messagingFactory, entityPath, sessionId, receiveMode)); } - public static CompletableFuture createMessageBrowserFromConnectionStringAsync(String amqpConnectionString) throws IOException + public static CompletableFuture acceptSessionFromConnectionStringAsync(String amqpConnectionString, String sessionId) throws IOException + { + return acceptSessionFromConnectionStringAsync(amqpConnectionString, sessionId, DEFAULTRECEIVEMODE); + } + + public static CompletableFuture acceptSessionFromConnectionStringAsync(String amqpConnectionString, String sessionId, ReceiveMode receiveMode) throws IOException { Utils.assertNonNull("amqpConnectionString", amqpConnectionString); - return createMessageBrowserFromConnectionStringBuilderAsync(new ConnectionStringBuilder(amqpConnectionString)); + return acceptSessionFromConnectionStringBuilderAsync(new ConnectionStringBuilder(amqpConnectionString), sessionId); + } + + public static CompletableFuture acceptSessionFromConnectionStringBuilderAsync(ConnectionStringBuilder amqpConnectionStringBuilder, String sessionId) throws IOException + { + return acceptSessionFromConnectionStringBuilderAsync(amqpConnectionStringBuilder, sessionId, DEFAULTRECEIVEMODE); } // Throwing IOException is ugly in an async method. Change it - public static CompletableFuture createMessageBrowserFromConnectionStringBuilderAsync(ConnectionStringBuilder amqpConnectionStringBuilder) throws IOException + public static CompletableFuture acceptSessionFromConnectionStringBuilderAsync(ConnectionStringBuilder amqpConnectionStringBuilder, String sessionId, ReceiveMode receiveMode) throws IOException { Utils.assertNonNull("amqpConnectionStringBuilder", amqpConnectionStringBuilder); - BrokeredMessageBrowser browser = new BrokeredMessageBrowser(amqpConnectionStringBuilder); - return browser.initializeAsync().thenApply((v) -> browser); + BrokeredMessageSession session = new BrokeredMessageSession(amqpConnectionStringBuilder, sessionId, receiveMode); + return session.initializeAsync().thenApply((v) -> session); + } + + public static CompletableFuture acceptSessionFromFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId) throws IOException + { + return acceptSessionFromFromEntityPathAsync(messagingFactory, entityPath, sessionId, DEFAULTRECEIVEMODE); } - public static CompletableFuture createMessageBrowserFromFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath) throws IOException + public static CompletableFuture acceptSessionFromFromEntityPathAsync(MessagingFactory messagingFactory, String entityPath, String sessionId, ReceiveMode receiveMode) throws IOException { Utils.assertNonNull("messagingFactory", messagingFactory); - BrokeredMessageBrowser browser = new BrokeredMessageBrowser(messagingFactory, entityPath); - return browser.initializeAsync().thenApply((v) -> browser); + BrokeredMessageSession session = new BrokeredMessageSession(messagingFactory, entityPath, sessionId, receiveMode); + return session.initializeAsync().thenApply((v) -> session); } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java index d68117b64fb19..c77fe45ab0d9e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageReceiver.java @@ -9,7 +9,7 @@ import com.microsoft.azure.servicebus.primitives.ServiceBusException; -public interface IMessageReceiver extends IMessageEntity{ +public interface IMessageReceiver extends IMessageEntity, IMessageBrowser{ ReceiveMode getReceiveMode(); void abandon(UUID lockToken) throws InterruptedException, ServiceBusException; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java index 3ae395387a3d9..95291cc5441d4 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSession.java @@ -4,20 +4,22 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; + public interface IMessageSession extends IMessageReceiver { String getSessionId(); Instant getLockedUntilUtc(); - void renewLock(); + void renewLock() throws InterruptedException, ServiceBusException; CompletableFuture renewLockAsync(); - void setState(InputStream stream); + void setState(byte[] state) throws InterruptedException, ServiceBusException; - CompletableFuture setStateAsync(InputStream stream); + CompletableFuture setStateAsync(byte[] state); - InputStream getState(); + byte[] getState() throws InterruptedException, ServiceBusException; - CompletableFuture getStateAsync(); + CompletableFuture getStateAsync(); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java new file mode 100644 index 0000000000000..c3f1d5cf6a489 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java @@ -0,0 +1,53 @@ +package com.microsoft.azure.servicebus; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.Util; + +final class SessionBrowser +{ + private static final int PAGESIZE = 100; + // .net DateTime.MaxValue need to be passed + private static final Date MAXDATE = new Date(253402300800000l); + + private final BrokeredMessageReceiver messageReceiver; + private final MessagingFactory messagingFactory; + private final String entityPath; + private String lastSessionId = null; + private int lastReceivedSkip = 0; + + SessionBrowser(MessagingFactory messagingFactory, BrokeredMessageReceiver messageReceiver, String entityPath) + { + this.messagingFactory = messagingFactory; + this.messageReceiver = messageReceiver; + this.entityPath = entityPath; + } + + public CompletableFuture> getMessageSessionsAsync() + { + return this.getMessageSessionsAsync(MAXDATE); + } + + public CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime) + { + return this.messageReceiver.getInternalReceiver().getMessageSessionsAsync(lastUpdatedTime, this.lastReceivedSkip, PAGESIZE, this.lastSessionId).thenApply((p) -> + { + ArrayList sessionsList = new ArrayList<>(); + this.lastReceivedSkip = p.getSecondItem(); + String[] sessionIds = p.getFirstItem(); + if(sessionIds != null && sessionIds.length > 0) + { + this.lastSessionId = sessionIds[sessionIds.length - 1]; + for(String sessionId : sessionIds) + { + sessionsList.add(new BrowsableMessageSession(sessionId, this.messagingFactory, this.messageReceiver.getInternalReceiver(), this.entityPath)); + } + } + return sessionsList; + }); + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index a6ca15728076d..c9ad1e5f7cfe5 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -36,6 +36,8 @@ private ClientConstants() { } public final static Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); public final static Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public final static Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public final static Symbol LINK_PEEKMODE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":peek-mode"); public final static Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final Symbol DEADLETTERNAME = Symbol.valueOf(AmqpConstants.VENDOR + ":dead-letter"); public static final Symbol MESSAGE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":message-lock-lost"); @@ -43,6 +45,8 @@ private ClientConstants() { } public static final Symbol SESSIONS_CANNOT_BE_LOCKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-cannot-be-locked"); public static final Symbol MESSAGE_NOT_FOUND_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":message-not-found"); public static final Symbol SESSION_NOT_FOUND_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-not-found"); + public static final Symbol SESSION_FILTER = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-filter"); + public static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol(AmqpConstants.VENDOR + ":locked-until-utc"); public static final String DEADLETTER_REASON_HEADER = "DeadLetterReason"; public static final String DEADLETTER_ERROR_DESCRIPTION_HEADER = "DeadLetterErrorDescription"; @@ -71,15 +75,21 @@ private ClientConstants() { } public static final String REQUEST_RESPONSE_OPERATION_NAME = "operation"; public static final String REQUEST_RESPONSE_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout"; public static final String REQUEST_RESPONSE_RENEWLOCK_OPERATION = AmqpConstants.VENDOR + ":renew-lock"; + public static final String REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION = AmqpConstants.VENDOR + ":renew-session-lock"; public static final String REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER = AmqpConstants.VENDOR + ":receive-by-sequence-number"; public static final String REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION = AmqpConstants.VENDOR + ":schedule-message"; public static final String REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION = AmqpConstants.VENDOR + ":cancel-scheduled-message"; public static final String REQUEST_RESPONSE_PEEK_OPERATION = AmqpConstants.VENDOR + ":peek-message"; - public static final String REQUEST_RESPONSE_UPDATE_DISPOSTION = AmqpConstants.VENDOR + ":update-disposition"; + public static final String REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION = AmqpConstants.VENDOR + ":update-disposition"; + public static final String REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION = AmqpConstants.VENDOR + ":get-session-state"; + public static final String REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION = AmqpConstants.VENDOR + ":set-session-state"; + public static final String REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION = AmqpConstants.VENDOR + ":get-message-sessions"; public static final String REQUEST_RESPONSE_LOCKTOKENS = "lock-tokens"; public static final String REQUEST_RESPONSE_LOCKTOKEN = "lock-token"; + public static final String REQUEST_RESPONSE_EXPIRATION = "expiration"; public static final String REQUEST_RESPONSE_EXPIRATIONS = "expirations"; public static final String REQUEST_RESPONSE_SESSIONID = "session-id"; + public static final String REQUEST_RESPONSE_SESSION_STATE = "session-state"; public static final String REQUEST_RESPONSE_SEQUENCE_NUMBERS = "sequence-numbers"; public static final String REQUEST_RESPONSE_RECEIVER_SETTLE_MODE = "receiver-settle-mode"; public static final String REQUEST_RESPONSE_MESSAGES = "messages"; @@ -96,6 +106,11 @@ private ClientConstants() { } public static final String REQUEST_RESPONSE_DEADLETTER_REASON = "deadletter-reason"; public static final String REQUEST_RESPONSE_DEADLETTER_DESCRIPTION = "deadletter-description"; public static final String REQUEST_RESPONSE_PROPERTIES_TO_MODIFY = "properties-to-modify"; + public static final String REQUEST_RESPONSE_LAST_UPDATED_TIME = "last-updated-time"; + public static final String REQUEST_RESPONSE_LAST_SESSION_ID = "last-session-id"; + public static final String REQUEST_RESPONSE_SKIP = "skip"; + public static final String REQUEST_RESPONSE_TOP = "top"; + public static final String REQUEST_RESPONSE_SESSIONIDS = "sessions-ids"; public static final String DISPOSITION_STATUS_COMPLETED = "completed"; public static final String DISPOSITION_STATUS_DEFERED = "defered"; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java index 8e4b7fa1f09cd..f384ec4ec4f7e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java @@ -34,7 +34,7 @@ * */ public class ConnectionStringBuilder -{ +{ final static String endpointFormat = "amqps://%s.servicebus.windows.net"; final static String endpointRawFormat = "amqps://%s"; @@ -89,7 +89,7 @@ private ConnectionStringBuilder( { try { - this.endpoint = new URI(String.format(Locale.US, endpointFormat, namespaceName)); + this.endpoint = new URI(String.format(Locale.US, this.getEndPointFormat(), namespaceName)); } catch(URISyntaxException exception) { @@ -396,4 +396,10 @@ else if (key.equalsIgnoreCase(RetryPolicyConfigName)) } } } + + // Just to override in onebox tests + String getEndPointFormat() + { + return this.endpointFormat; + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageBrowser.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageBrowser.java deleted file mode 100644 index 9ad923cbb37c0..0000000000000 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageBrowser.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.microsoft.azure.servicebus.primitives; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.message.Message; - -import com.microsoft.azure.servicebus.amqp.AmqpConstants; - -public class MessageBrowser extends ClientEntity -{ - private RequestResponseLink requestResponseLink; - private final MessagingFactory underlyingFactory; - private final String receivePath; - - private MessageBrowser(final MessagingFactory factory, final String name, final String recvPath) - { - super(name, factory); - this.underlyingFactory = factory; - this.receivePath = recvPath; - } - - public static CompletableFuture create( - final MessagingFactory factory, - final String name, - final String recvPath) - { - MessageBrowser msgBrowser = new MessageBrowser(factory, name, recvPath); - return msgBrowser.createLink(); - } - - private CompletableFuture createLink() - { - String requestResponseLinkPath = this.receivePath + AmqpConstants.MANAGEMENT_ADDRESS_SEGMENT; - CompletableFuture crateAndAssignRequestResponseLink = - RequestResponseLink.createAsync(this.underlyingFactory, this.getClientId() + "-RequestResponse", requestResponseLinkPath).thenAccept((rrlink) -> {MessageBrowser.this.requestResponseLink = rrlink;}); - return crateAndAssignRequestResponseLink.thenApply((v) -> this); - } - - @Override - protected CompletableFuture onClose() { - return this.requestResponseLink == null ? CompletableFuture.completedFuture(null) : this.requestResponseLink.closeAsync(); - } - - public CompletableFuture> peekMessages(long fromSequenceNumber, int messageCount, String sessionId, Duration timeout) - { - HashMap requestBodyMap = new HashMap(); - requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_FROM_SEQUENCE_NUMER, fromSequenceNumber); - requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_COUNT, messageCount); - if(!StringUtil.isNullOrEmpty(sessionId)) - { - requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, sessionId); - } - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_PEEK_OPERATION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(timeout)); - CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, timeout); - return responseFuture.thenCompose((responseMessage) -> { - CompletableFuture> returningFuture = new CompletableFuture>(); - int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { - List peekedMessages = new ArrayList(); - Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue(); - if(responseBodyMap != null && responseBodyMap instanceof Map) - { - Object messages = ((Map)responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES); - if(messages != null && messages instanceof Iterable) - { - for(Object message : (Iterable)messages) - { - if(message instanceof Map) - { - Message peekedMessage = Message.Factory.create(); - Binary messagePayLoad = (Binary)((Map)message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE); - peekedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength()); - peekedMessages.add(peekedMessage); - } - } - } - } - returningFuture.complete(peekedMessages); - } - else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE || - (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.MESSAGE_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) - { - returningFuture.complete(new ArrayList()); - } - else - { - // error response - returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); - } - return returningFuture; - }); - } -} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageReceiver.java index 9c0cdcd2ece56..d9ab060717346 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageReceiver.java @@ -50,9 +50,6 @@ import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; -import com.microsoft.azure.servicebus.BrokeredMessage; -import com.microsoft.azure.servicebus.IBrokeredMessage; -import com.microsoft.azure.servicebus.MessageConverter; import com.microsoft.azure.servicebus.amqp.AmqpConstants; import com.microsoft.azure.servicebus.amqp.DispatchHandler; import com.microsoft.azure.servicebus.amqp.IAmqpReceiver; @@ -80,6 +77,10 @@ public class MessageReceiver extends ClientEntity implements IAmqpReceiver, IErr private final SettleModePair settleModePair; private final RetryPolicy retryPolicy; private int prefetchCount; + private String sessionId; + private boolean isSessionReceiver; + private boolean isBrowsableSession; + private Instant sessionLockedUntilUtc; private ConcurrentLinkedQueue prefetchedMessages; private Receiver receiveLink; @@ -98,7 +99,8 @@ public class MessageReceiver extends ClientEntity implements IAmqpReceiver, IErr private MessageReceiver(final MessagingFactory factory, final String name, - final String recvPath, + final String recvPath, + final String sessionId, final int prefetchCount, final SettleModePair settleModePair) { @@ -107,6 +109,9 @@ private MessageReceiver(final MessagingFactory factory, this.underlyingFactory = factory; this.operationTimeout = factory.getOperationTimeout(); this.receivePath = recvPath; + this.sessionId = sessionId; + this.isSessionReceiver = false; + this.isBrowsableSession = false; this.prefetchCount = prefetchCount; this.settleModePair = settleModePair; this.prefetchedMessages = new ConcurrentLinkedQueue(); @@ -145,7 +150,6 @@ public void run() { Timer.schedule(timedOutUpdateStateRequestsDaemon, Duration.ofSeconds(1), TimerType.RepeatRun); } - // @param connection Connection on which the MessageReceiver's receive Amqp link need to be created on. // Connection has to be associated with Reactor before Creating a receiver on it. public static CompletableFuture create( final MessagingFactory factory, @@ -158,15 +162,37 @@ public static CompletableFuture create( factory, name, recvPath, + null, prefetchCount, settleModePair); return msgReceiver.createLink(); } + + public static CompletableFuture create( + final MessagingFactory factory, + final String name, + final String recvPath, + final String sessionId, + final boolean isBrowsableSession, + final int prefetchCount, + final SettleModePair settleModePair) + { + MessageReceiver msgReceiver = new MessageReceiver( + factory, + name, + recvPath, + sessionId, + prefetchCount, + settleModePair); + msgReceiver.isSessionReceiver = true; + msgReceiver.isBrowsableSession = isBrowsableSession; + return msgReceiver.createLink(); + } private CompletableFuture createLink() { this.linkOpen = new WorkItem(new CompletableFuture(), this.operationTimeout); - this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker()); + this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker()); try { this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() @@ -193,10 +219,7 @@ public void onEvent() private void createReceiveLink() { - Connection connection = this.underlyingFactory.getConnection(); - - Source source = new Source(); - source.setAddress(receivePath); + Connection connection = this.underlyingFactory.getConnection(); final Session session = connection.session(); session.setIncomingCapacity(Integer.MAX_VALUE); @@ -208,18 +231,35 @@ private void createReceiveLink() receiveLinkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()) : receiveLinkNamePrefix; final Receiver receiver = session.receiver(receiveLinkName); + + Source source = new Source(); + source.setAddress(receivePath); + Map linkProperties = new HashMap(); + linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()); + + if(this.isSessionReceiver) + { + HashMap filterMap = new HashMap(); + filterMap.put(ClientConstants.SESSION_FILTER, this.sessionId); + source.setFilter(filterMap); + + linkProperties.put(ClientConstants.LINK_PEEKMODE_PROPERTY, this.isBrowsableSession); + } + receiver.setSource(source); receiver.setTarget(new Target()); // Set settle modes receiver.setSenderSettleMode(this.settleModePair.getSenderSettleMode()); receiver.setReceiverSettleMode(this.settleModePair.getReceiverSettleMode()); + + receiver.setProperties(linkProperties); final ReceiveLinkHandler handler = new ReceiveLinkHandler(this); BaseHandler.setHandler(receiver, handler); this.underlyingFactory.registerForConnectionError(receiver); - receiver.open(); + receiver.open(); if (this.receiveLink != null) { @@ -260,6 +300,25 @@ public int getPrefetchCount() return this.prefetchCount; } } + + + public String getSessionId() + { + return this.sessionId; + } + + + public Instant getSessionLockedUntilUtc() + { + if(this.isSessionReceiver) + { + return this.sessionLockedUntilUtc; + } + else + { + throw new RuntimeException("Object is not a session receiver"); + } + } public void setPrefetchCount(final int value) throws ServiceBusException { @@ -378,6 +437,32 @@ public void onOpenComplete(Exception exception) { if (exception == null) { + if(this.isSessionReceiver) + { + Map remoteSourceFilter = ((Source)this.receiveLink.getRemoteSource()).getFilter(); + if(remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) + { + String remoteSessionId = (String)remoteSourceFilter.get(ClientConstants.SESSION_FILTER); + this.sessionId = remoteSessionId; + + if(this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) + { + this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant((long)this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC)); + } + else + { + this.sessionLockedUntilUtc = Instant.ofEpochMilli(0); + } + } + else + { + exception = new ServiceBusException(false, "SessionId filter not set on the remote source."); + } + } + } + + if (exception == null) + { if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { this.linkOpen.getWork().complete(this); @@ -911,17 +996,17 @@ private static ServiceBusException generateDispatacherSchedulingFailedException( return new ServiceBusException(false, operation + " failed while dispatching to Reactor, see cause for more details.", cause); } - public CompletableFuture> renewMessageLocksAsync(UUID[] lockTokens, String sessionId, Duration timeout) + public CompletableFuture> renewMessageLocksAsync(UUID[] lockTokens) { HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens); - if(!StringUtil.isNullOrEmpty(sessionId)) + if(this.isSessionReceiver) { - requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, sessionId); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); } - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(timeout)); - CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, timeout); + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); return responseFuture.thenCompose((responseMessage) -> { CompletableFuture> returningFuture = new CompletableFuture>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); @@ -944,8 +1029,12 @@ public CompletableFuture> receiveBySequenceNumb HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1)); + if(this.isSessionReceiver) + { + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); + } - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, RequestResponseUtils.adjustServerTimeout(this.operationTimeout)); + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); return responseFuture.thenCompose((responseMessage) -> { CompletableFuture> returningFuture = new CompletableFuture>(); @@ -1007,15 +1096,45 @@ public CompletableFuture updateDispositionAsync(UUID[] lockTokens, String if(propertiesToModify != null && propertiesToModify.size() > 0) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, propertiesToModify); - } + } + + if(this.isSessionReceiver) + { + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); + } + + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); + return responseFuture.thenCompose((responseMessage) -> { + CompletableFuture returningFuture = new CompletableFuture(); + int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); + if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) + { + returningFuture.complete(null); + } + else + { + // error response + returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); + } + return returningFuture; + }); + } + + public CompletableFuture renewSessionLocksAsync() + { + HashMap requestBodyMap = new HashMap(); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(this.operationTimeout)); + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); return responseFuture.thenCompose((responseMessage) -> { CompletableFuture returningFuture = new CompletableFuture(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { + Date expiration = (Date)RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATION); + this.sessionLockedUntilUtc = expiration.toInstant(); returningFuture.complete(null); } else @@ -1026,4 +1145,152 @@ public CompletableFuture updateDispositionAsync(UUID[] lockTokens, String return returningFuture; }); } + + public CompletableFuture getSessionStateAsync() + { + HashMap requestBodyMap = new HashMap(); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); + + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); + return responseFuture.thenCompose((responseMessage) -> { + CompletableFuture returningFuture = new CompletableFuture(); + int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); + if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) + { + byte[] receivedState = null; + Map bodyMap = RequestResponseUtils.getResponseBody(responseMessage); + if(bodyMap.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) + { + Object sessionState = bodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSION_STATE); + if(sessionState != null) + { + receivedState = ((Binary)sessionState).getArray(); + } + } + + returningFuture.complete(receivedState); + } + else + { + // error response + returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); + } + return returningFuture; + }); + } + + // NULL session state is allowed + public CompletableFuture setSessionStateAsync(byte[] sessionState) + { + HashMap requestBodyMap = new HashMap(); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSION_STATE, sessionState == null ? null : new Binary(sessionState)); + + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); + return responseFuture.thenCompose((responseMessage) -> { + CompletableFuture returningFuture = new CompletableFuture(); + int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); + if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) + { + returningFuture.complete(null); + } + else + { + // error response + returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); + } + return returningFuture; + }); + } + + // A receiver can be used to peek messages from any session-id, useful for browsable sessions + public CompletableFuture> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) + { + HashMap requestBodyMap = new HashMap(); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_FROM_SEQUENCE_NUMER, fromSequenceNumber); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_COUNT, messageCount); + if(sessionId != null) + { + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, sessionId); + } + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_PEEK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); + return responseFuture.thenCompose((responseMessage) -> { + CompletableFuture> returningFuture = new CompletableFuture>(); + int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); + if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) + { + List peekedMessages = new ArrayList(); + Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue(); + if(responseBodyMap != null && responseBodyMap instanceof Map) + { + Object messages = ((Map)responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES); + if(messages != null && messages instanceof Iterable) + { + for(Object message : (Iterable)messages) + { + if(message instanceof Map) + { + Message peekedMessage = Message.Factory.create(); + Binary messagePayLoad = (Binary)((Map)message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE); + peekedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength()); + peekedMessages.add(peekedMessage); + } + } + } + } + returningFuture.complete(peekedMessages); + } + else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE || + (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.MESSAGE_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) + { + returningFuture.complete(new ArrayList()); + } + else + { + // error response + returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); + } + return returningFuture; + }); + } + + public CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime, int skip, int top, String lastSessionId) + { + HashMap requestBodyMap = new HashMap(); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LAST_UPDATED_TIME, lastUpdatedTime); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SKIP, skip); + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_TOP, top); + if(lastSessionId != null) + { + requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LAST_SESSION_ID, lastSessionId); + } + + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_GET_MESSAGE_SESSIONS_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout)); + CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout); + return responseFuture.thenCompose((responseMessage) -> { + CompletableFuture> returningFuture = new CompletableFuture>(); + int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); + if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) + { + Map responseBodyMap = RequestResponseUtils.getResponseBody(responseMessage); + int responseSkip = (int)responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SKIP); + String[] sessionIds = (String[])responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSIONIDS); + returningFuture.complete(new Pair<>(sessionIds, responseSkip)); + } + else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE || + (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.SESSION_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) + { + returningFuture.complete(new Pair<>(new String[0], 0)); + } + else + { + // error response + returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage)); + } + return returningFuture; + }); + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageSender.java index aec53636fd72d..42ba1805e67f9 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageSender.java @@ -560,6 +560,10 @@ private void createSendLink() sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + Map linkProperties = new HashMap(); + linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()); + sender.setProperties(linkProperties); + SendLinkHandler handler = new SendLinkHandler(MessageSender.this); BaseHandler.setHandler(sender, handler); @@ -895,7 +899,7 @@ public CompletableFuture scheduleMessageAsync(Message[] messages, Durati messageList.add(messageEntry); } requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGES, messageList); - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(timeout)); + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout)); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, timeout); return responseFuture.thenCompose((responseMessage) -> { CompletableFuture returningFuture = new CompletableFuture(); @@ -919,7 +923,7 @@ public CompletableFuture cancelScheduledMessageAsync(Long[] sequenceNumber HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers); - Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(timeout)); + Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION, requestBodyMap, Util.adjustServerTimeout(timeout)); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, timeout); return responseFuture.thenCompose((responseMessage) -> { CompletableFuture returningFuture = new CompletableFuture(); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index 6b6b5a4bd1b66..7c0fe6ec24465 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -198,14 +198,13 @@ public void onConnectionError(ErrorCondition error) } else { - final Connection currentConnection = this.connection; - Iterator literator = this.registeredLinks.iterator(); + final Connection currentConnection = this.connection; + Link[] links = this.registeredLinks.toArray(new Link[0]); this.openConnection = new CompletableFuture(); - while (literator.hasNext()) - { - Link link = literator.next(); + for(Link link : links) + { if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { link.close(); @@ -217,10 +216,8 @@ public void onConnectionError(ErrorCondition error) currentConnection.close(); } - literator = this.registeredLinks.iterator(); - while (literator.hasNext()) - { - Link link = literator.next(); + for(Link link : links) + { Handler handler = BaseHandler.getHandler(link); if (handler != null && handler instanceof BaseLinkHandler) { @@ -256,12 +253,12 @@ private void onReactorError(Exception cause) TRACE_LOGGER.log(Level.SEVERE, ExceptionUtil.toStackTraceString(e, "Re-starting reactor failed with error")); this.onReactorError(cause); - } - - Iterator literator = this.registeredLinks.iterator(); - while (literator.hasNext()) + } + + Link[] links = this.registeredLinks.toArray(new Link[0]); + + for(Link link : links) { - Link link = literator.next(); if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { link.close(); @@ -273,17 +270,15 @@ private void onReactorError(Exception cause) currentConnection.close(); } - literator = this.registeredLinks.iterator(); - while (literator.hasNext()) + for(Link link : links) { - Link link = literator.next(); Handler handler = BaseHandler.getHandler(link); if (handler != null && handler instanceof BaseLinkHandler) { BaseLinkHandler linkHandler = (BaseLinkHandler) handler; linkHandler.processOnClose(link, cause); } - } + } } } @@ -387,13 +382,13 @@ public void run() @Override public void registerForConnectionError(Link link) { - this.registeredLinks.add(link); + this.registeredLinks.add(link); } @Override public void deregisterForConnectionError(Link link) { - this.registeredLinks.remove(link); + this.registeredLinks.remove(link); } public void scheduleOnReactorThread(final DispatchHandler handler) throws IOException diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index cce85b9384f1f..bd67e0e49982c 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -6,6 +6,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Date; +import java.util.HashMap; import java.util.LinkedList; import java.util.Locale; import java.util.Map; @@ -121,6 +122,9 @@ private void createInternalLinks() { this.replyTo = UUID.randomUUID().toString(); + Map commonLinkProperties = new HashMap(); + commonLinkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()); + // Create send link final Connection connection = this.underlyingFactory.getConnection(); @@ -141,8 +145,8 @@ private void createInternalLinks() Source senderSource = new Source(); senderSource.setAddress(this.replyTo); sender.setSource(senderSource); - sender.setSenderSettleMode(SenderSettleMode.SETTLED); - + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + sender.setProperties(commonLinkProperties); SendLinkHandler sendLinkHandler = new SendLinkHandler(this.amqpSender); BaseHandler.setHandler(sender, sendLinkHandler); @@ -165,6 +169,7 @@ private void createInternalLinks() // Set settle modes receiver.setSenderSettleMode(SenderSettleMode.SETTLED); receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + receiver.setProperties(commonLinkProperties); final ReceiveLinkHandler receiveLinkHandler = new ReceiveLinkHandler(this.amqpReceiver); BaseHandler.setHandler(receiver, receiveLinkHandler); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseUtils.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseUtils.java index 677ec7880dfe4..0da683832744f 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseUtils.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseUtils.java @@ -19,17 +19,11 @@ public static Message createRequestMessage(String operation, Map propertyBag, Du requestMessage.setBody(new AmqpValue(propertyBag)); HashMap applicationPropertiesMap = new HashMap(); applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_OPERATION_NAME, operation); - applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_TIMEOUT, timeout.toMillis()); + applicationPropertiesMap.put(ClientConstants.REQUEST_RESPONSE_TIMEOUT, timeout.toMillis()); requestMessage.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); return requestMessage; } - // Pass one second less to the server so client doesn't time out before server times out - public static Duration adjustServerTimeout(Duration clientTimeout) - { - return clientTimeout.minusMillis(100); - } - public static int getResponseStatusCode(Message responseMessage) { int statusCode = ClientConstants.REQUEST_RESPONSE_UNDEFINED_STATUS_CODE; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java index 2dc259de5e85c..20127eb127de0 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java @@ -8,8 +8,12 @@ import java.lang.reflect.Array; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import java.time.Duration; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.Locale; import java.util.Map; @@ -373,4 +377,10 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", ClientConstants.MAX_MESSAGE_LENGTH_BYTES / 1024), exception); } } + + // Pass little less than client timeout to the server so client doesn't time out before server times out + public static Duration adjustServerTimeout(Duration clientTimeout) + { + return clientTimeout.minusMillis(100); + } } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSendReceiveTests.java index d2a22126dccac..2bf1ba34651a2 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSendReceiveTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSendReceiveTests.java @@ -1,503 +1,16 @@ package com.microsoft.azure.servicebus; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ExecutionException; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.microsoft.azure.servicebus.BrokeredMessage; -import com.microsoft.azure.servicebus.ClientFactory; -import com.microsoft.azure.servicebus.IBrokeredMessage; -import com.microsoft.azure.servicebus.IMessageReceiver; -import com.microsoft.azure.servicebus.IMessageSender; -import com.microsoft.azure.servicebus.ReceiveMode; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.primitives.MessageNotFoundException; -import com.microsoft.azure.servicebus.primitives.MessagingFactory; -import com.microsoft.azure.servicebus.primitives.ServiceBusException; -public class QueueSendReceiveTests { - private ConnectionStringBuilder builder; - private MessagingFactory factory; - private IMessageSender sender; - private IMessageReceiver receiver; - private IMessageBrowser browser; - - @Before // Fix this. something goes wrong when we do this setup. - public void setup() throws IOException, InterruptedException, ExecutionException, ServiceBusException - { - this.builder = TestUtils.getConnectionStringBuilder(); - this.factory = MessagingFactory.createFromConnectionStringBuilder(builder); - this.sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(builder); - - this.drainAllMessages(builder); - //Thread.sleep(60000); - } - - @After - public void tearDown() throws ServiceBusException - { - this.sender.close(); - if(this.receiver != null) - this.receiver.close(); - if(this.browser != null) - this.browser.close(); - this.factory.close(); - } - - @Test - public void testBasicQueueSend() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP message")); - } - - @Test - public void testBasicQueueSendBatch() throws InterruptedException, ServiceBusException, IOException - { - List messages = new ArrayList(); - for(int i=0; i<10; i++) - { - messages.add(new BrokeredMessage("AMQP message")); - } - this.sender.sendBatch(messages); - } - - @Test - public void testBasicQueueReceiveAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - BrokeredMessage message = new BrokeredMessage("AMQP message"); - message.setMessageId(messageId); - this.sender.send(message); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, builder.getEntityPath(), ReceiveMode.ReceiveAndDelete); - IBrokeredMessage receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - receivedMessage = this.receiver.receive(); - Assert.assertNull("Message received again", receivedMessage); - } - - @Test - public void testBasicQueueReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i receivedMessages = this.receiver.receiveBatch(numMessages); - while(receivedMessages != null && receivedMessages.size() > 0) - { - totalReceivedMessages += receivedMessages.size(); - receivedMessages = this.receiver.receiveBatch(numMessages); - } - - Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages); - receivedMessages = this.receiver.receiveBatch(numMessages); - Assert.assertNull("Messages received again", receivedMessages); - } - - @Test - public void testBasicQueueReceiveAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - BrokeredMessage message = new BrokeredMessage("AMQP message"); - message.setMessageId(messageId); - this.sender.send(message); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - this.receiver.complete(receivedMessage.getLockToken()); - receivedMessage = this.receiver.receive(); - Assert.assertNull("Message was not properly completed", receivedMessage); - } - - @Test - public void testBasicQueueReceiveAndAbandon() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - BrokeredMessage message = new BrokeredMessage("AMQP message"); - message.setMessageId(messageId); - this.sender.send(message); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - long deliveryCount = receivedMessage.getDeliveryCount(); - this.receiver.abandon(receivedMessage.getLockToken()); - receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("DeliveryCount not incremented", deliveryCount+1, receivedMessage.getDeliveryCount()); - this.receiver.complete(receivedMessage.getLockToken()); - } - - @Test - public void testBasicQueueReceiveAndDeadLetter() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - BrokeredMessage message = new BrokeredMessage("AMQP message"); - message.setMessageId(messageId); - this.sender.send(message); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - String deadLetterReason = "java client deadletter test"; - this.receiver.deadLetter(receivedMessage.getLockToken(), deadLetterReason, null); - receivedMessage = this.receiver.receive(); - Assert.assertNull("Message was not properly deadlettered", receivedMessage); - } - - @Test - public void testBasicQueueReceiveAndRenewLock() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - BrokeredMessage message = new BrokeredMessage("AMQP message"); - message.setMessageId(messageId); - this.sender.send(message); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - Instant oldLockedUntilTime = receivedMessage.getLockedUntilUtc(); - Thread.sleep(1000); - Instant newLockedUntilUtc = this.receiver.renewMessageLock(receivedMessage); - Assert.assertTrue("Lock not renewed. OldLockedUntilUtc:" + oldLockedUntilTime.toString() + ", newLockedUntilUtc:" + newLockedUntilUtc, newLockedUntilUtc.isAfter(oldLockedUntilTime)); - Assert.assertEquals("Renewed lockeduntil time not set in Message", newLockedUntilUtc, receivedMessage.getLockedUntilUtc()); - this.receiver.complete(receivedMessage.getLockToken()); +public class QueueSendReceiveTests extends SendReceiveTests +{ + @Override + public ConnectionStringBuilder getSenderConnectionStringBuilder() { + return TestUtils.getQueueConnectionStringBuilder(); } - - @Test - public void testBasicQueueReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i totalReceivedMessages = new ArrayList<>(); - - Collection receivedMessages = this.receiver.receiveBatch(numMessages); - totalReceivedMessages.addAll(receivedMessages); - while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages) - { - receivedMessages = this.receiver.receiveBatch(numMessages); - totalReceivedMessages.addAll(receivedMessages); - } - Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size()); - - ArrayList oldLockTimes = new ArrayList(); - for(IBrokeredMessage message : totalReceivedMessages) - { - oldLockTimes.add(message.getLockedUntilUtc()); - } - - Thread.sleep(1000); - Collection newLockTimes = ((BrokeredMessageReceiver)this.receiver).renewMessageLockBatch(totalReceivedMessages); - Assert.assertEquals("RenewLock didn't return one instant per message in the collection", totalReceivedMessages.size(), newLockTimes.size()); - Iterator newLockTimeIterator = newLockTimes.iterator(); - Iterator oldLockTimeIterator = oldLockTimes.iterator(); - for(IBrokeredMessage message : totalReceivedMessages) - { - Instant oldLockTime = oldLockTimeIterator.next(); - Instant newLockTime = newLockTimeIterator.next(); - Assert.assertTrue("Lock not renewed. OldLockedUntilUtc:" + oldLockTime.toString() + ", newLockedUntilUtc:" + newLockTime.toString(), newLockTime.isAfter(oldLockTime)); - Assert.assertEquals("Renewed lockeduntil time not set in Message", newLockTime, message.getLockedUntilUtc()); - this.receiver.complete(message.getLockToken()); - } - } - - @Test - public void testBasicQueueReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException - { - int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i receivedMessages = this.receiver.receiveBatch(numMessages); - while(receivedMessages != null && receivedMessages.size() > 0) - { - totalMessagesReceived += receivedMessages.size(); - for(IBrokeredMessage message : receivedMessages) - { - //System.out.println(message.getLockToken()); - this.receiver.complete(message.getLockToken()); - } - receivedMessages = this.receiver.receiveBatch(numMessages); - } - Assert.assertEquals("All messages not received", numMessages, totalMessagesReceived); - - receivedMessages = this.receiver.receiveBatch(numMessages); - Assert.assertNull("Messages received again", receivedMessages); - } - - @Test - public void testSendSceduledMessageAndReceive() throws InterruptedException, ServiceBusException, IOException - { - int secondsToWaitBeforeScheduling = 30; - String msgId1 = UUID.randomUUID().toString(); - String msgId2 = UUID.randomUUID().toString(); - BrokeredMessage message1 = new BrokeredMessage("AMQP Scheduled message"); - message1.setMessageId(msgId1); - BrokeredMessage message2 = new BrokeredMessage("AMQP Scheduled message2"); - message2.setMessageId(msgId2); - - this.sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); - this.sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); - Thread.sleep(secondsToWaitBeforeScheduling * 1000); - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.ReceiveAndDelete); - Collection allReceivedMessages = new LinkedList(); - Collection receivedMessages = this.receiver.receiveBatch(10); - - while(receivedMessages != null && receivedMessages.size() > 0) - { - allReceivedMessages.addAll(receivedMessages); - receivedMessages = this.receiver.receiveBatch(10); - } - - boolean firstMessageReceived = false; - boolean secondMessageReceived = false; - for(IBrokeredMessage message : allReceivedMessages) - { - if(message.getMessageId().equals(msgId1)) - firstMessageReceived = true; - else if(message.getMessageId().equals(msgId2)) - secondMessageReceived = true; - } - - Assert.assertTrue("Scheduled messages not received", firstMessageReceived && secondMessageReceived); - } - - @Test - public void testSendSceduledMessageAndCancel() throws InterruptedException, ServiceBusException, IOException - { - int secondsToWaitBeforeScheduling = 30; - String msgId1 = UUID.randomUUID().toString(); - String msgId2 = UUID.randomUUID().toString(); - BrokeredMessage message1 = new BrokeredMessage("AMQP Scheduled message"); - BrokeredMessage message2 = new BrokeredMessage("AMQP Scheduled message2"); - message1.setMessageId(msgId1); - message2.setMessageId(msgId2); - - this.sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); - long sequnceNumberMsg2 = this.sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); - this.sender.cancelScheduledMessage(sequnceNumberMsg2); - Thread.sleep(secondsToWaitBeforeScheduling * 1000); - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.ReceiveAndDelete); - Collection allReceivedMessages = new LinkedList(); - Collection receivedMessages = this.receiver.receiveBatch(10); - while(receivedMessages != null && receivedMessages.size() > 0) - { - allReceivedMessages.addAll(receivedMessages); - receivedMessages = this.receiver.receiveBatch(10); - } - - Assert.assertTrue("Scheduled messages not received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId1))); - Assert.assertFalse("Cancelled scheduled messages also received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId2))); - } - - @Test - public void testPeekMessage() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP Scheduled message")); - this.sender.send(new BrokeredMessage("AMQP Scheduled message2")); - - this.browser = ClientFactory.createMessageBrowserFromEntityPath(factory, this.builder.getEntityPath()); - IBrokeredMessage peekedMessage1 = this.browser.peek(); - long firstMessageSequenceNumber = peekedMessage1.getSequenceNumber(); - IBrokeredMessage peekedMessage2 = this.browser.peek(); - Assert.assertNotEquals("Peek returned the same message again.", firstMessageSequenceNumber, peekedMessage2.getSequenceNumber()); - - // Now peek with fromSequnceNumber.. May not work for partitioned entities - IBrokeredMessage peekedMessage5 = this.browser.peek(firstMessageSequenceNumber); - Assert.assertEquals("Peek with sequence number failed.", firstMessageSequenceNumber, peekedMessage5.getSequenceNumber()); - } - - @Test - public void testPeekMessageBatch() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP Scheduled message")); - this.sender.send(new BrokeredMessage("AMQP Scheduled message2")); - - this.browser = ClientFactory.createMessageBrowserFromEntityPath(factory, this.builder.getEntityPath()); - Collection peekedMessages = this.browser.peekBatch(10); - Assert.assertEquals("PeekBatch didnot return all messages.", 2, peekedMessages.size()); - long firstMessageSequenceNumber = peekedMessages.iterator().next().getSequenceNumber(); - - // Now peek with fromSequnceNumber.. May not work for partitioned entities - Collection peekedMessagesBatch2 = this.browser.peekBatch(firstMessageSequenceNumber, 10); - Assert.assertEquals("PeekBatch with sequence number didnot return all messages.", 2, peekedMessagesBatch2.size()); - Assert.assertEquals("PeekBatch with sequence number failed.", firstMessageSequenceNumber, peekedMessagesBatch2.iterator().next().getSequenceNumber()); - } - - @Test - public void testReceiveBySequenceNumberAndComplete() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP message")); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - long sequenceNumber = receivedMessage.getSequenceNumber(); - String messageId = receivedMessage.getMessageId(); - this.receiver.defer(receivedMessage.getLockToken()); - - // Now receive by sequence number - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); - this.receiver.complete(receivedMessage.getLockToken()); - - // Try to receive by sequence number again - try - { - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.fail("Message recieved by sequnce number was not properly completed."); - } - catch(MessageNotFoundException e) - { - // Expected - } - } - - @Test - public void testReceiveBySequenceNumberAndAbandon() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP message")); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - long sequenceNumber = receivedMessage.getSequenceNumber(); - String messageId = receivedMessage.getMessageId(); - this.receiver.defer(receivedMessage.getLockToken()); - - // Now receive by sequence number - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); - long deliveryCount = receivedMessage.getDeliveryCount(); - this.receiver.abandon(receivedMessage.getLockToken()); - - // Try to receive by sequence number again - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("Abandon didn't increase the delivery count for the message received by sequence number.", deliveryCount + 1, receivedMessage.getDeliveryCount()); - this.receiver.complete(receivedMessage.getLockToken()); - } - - @Test - public void testReceiveBySequenceNumberAndDefer() throws InterruptedException, ServiceBusException, IOException - { - BrokeredMessage sentMessage = new BrokeredMessage("AMQP message"); - HashMap customProperties = new HashMap(); - customProperties.put("phase", "before defer"); - sentMessage.setProperties(customProperties); - this.sender.send(sentMessage); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - long sequenceNumber = receivedMessage.getSequenceNumber(); - String messageId = receivedMessage.getMessageId(); - this.receiver.defer(receivedMessage.getLockToken()); - - // Now receive by sequence number - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); - customProperties.put("phase", "after defer"); - this.receiver.defer(receivedMessage.getLockToken(), customProperties); - - // Try to receive by sequence number again - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message after deferrring", sequenceNumber, receivedMessage.getSequenceNumber()); - Assert.assertEquals("Defer didn't update properties of the message received by sequence number", "after defer", receivedMessage.getProperties().get("phase")); - this.receiver.complete(receivedMessage.getLockToken()); - } - - @Test - public void testReceiveBySequenceNumberAndDeadletter() throws InterruptedException, ServiceBusException, IOException - { - this.sender.send(new BrokeredMessage("AMQP message")); - - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.builder.getEntityPath(), ReceiveMode.PeekLock); - IBrokeredMessage receivedMessage = this.receiver.receive(); - long sequenceNumber = receivedMessage.getSequenceNumber(); - String messageId = receivedMessage.getMessageId(); - this.receiver.defer(receivedMessage.getLockToken()); - - // Now receive by sequence number - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); - Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); - String deadLetterReason = "java client deadletter test"; - this.receiver.deadLetter(receivedMessage.getLockToken(), deadLetterReason, null); - - // Try to receive by sequence number again - try - { - receivedMessage = this.receiver.receive(sequenceNumber); - Assert.fail("Message received by sequence number was not properly deadlettered"); - } - catch(MessageNotFoundException e) - { - // Expected - } - } - - private void drainAllMessages(ConnectionStringBuilder builder) throws IOException, InterruptedException, ExecutionException, ServiceBusException - { - Duration waitTime = Duration.ofSeconds(5); - final int batchSize = 10; - IMessageReceiver receiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.builder.getEntityPath(), ReceiveMode.ReceiveAndDelete); - Collection messages = receiver.receiveBatch(batchSize, waitTime); - while(messages !=null && messages.size() > 0) - { - messages = receiver.receiveBatch(batchSize, waitTime); - } - - receiver.close(); + + @Override + public ConnectionStringBuilder getReceiverConnectionStringBuilder() { + return TestUtils.getQueueConnectionStringBuilder(); } - - - // Test send batch - // Test send and expect timeout - - // receive with timeout - // receive batch - // receive batch with timeout - // timed out receive should return null - // Send message with Id, receive and verify Id - // Send message with various properties like ttl.., receive it and verify them - } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSessionTests.java new file mode 100644 index 0000000000000..d9f5ba6324060 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/QueueSessionTests.java @@ -0,0 +1,16 @@ +package com.microsoft.azure.servicebus; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; + +public class QueueSessionTests extends SessionTests +{ + @Override + public ConnectionStringBuilder getSenderConnectionStringBuilder() { + return TestUtils.getSessionfulQueueConnectionStringBuilder(); + } + + @Override + public ConnectionStringBuilder getReceiverConnectionStringBuilder() { + return TestUtils.getSessionfulQueueConnectionStringBuilder(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java new file mode 100644 index 0000000000000..3585bcffa7f1d --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java @@ -0,0 +1,177 @@ +package com.microsoft.azure.servicebus; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; + +public abstract class SendReceiveTests { + private ConnectionStringBuilder sendBuilder; + private ConnectionStringBuilder receiveBuilder; + private MessagingFactory factory; + private IMessageSender sender; + private IMessageReceiver receiver; + private final String sessionId = null; + + @Before + public void setup() throws IOException, InterruptedException, ExecutionException, ServiceBusException + { + this.sendBuilder = this.getSenderConnectionStringBuilder(); + this.receiveBuilder = this.getReceiverConnectionStringBuilder(); + this.factory = MessagingFactory.createFromConnectionStringBuilder(this.sendBuilder); + this.sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(this.sendBuilder); + } + + @After + public void tearDown() throws ServiceBusException, IOException, InterruptedException, ExecutionException + { + this.drainAllMessages(); + + this.sender.close(); + if(this.receiver != null) + this.receiver.close(); + this.factory.close(); + } + + public abstract ConnectionStringBuilder getSenderConnectionStringBuilder(); + + public abstract ConnectionStringBuilder getReceiverConnectionStringBuilder(); + + @Test + public void testBasicSend() throws InterruptedException, ServiceBusException, IOException + { + TestCommons.testBasicSend(this.sender); + } + + @Test + public void testBasicSendBatch() throws InterruptedException, ServiceBusException, IOException + { + TestCommons.testBasicSendBatch(this.sender); + } + + @Test + public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.ReceiveAndDelete); + TestCommons.testBasicReceiveAndDelete(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.ReceiveAndDelete); + TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndComplete(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndAbandon() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndAbandon(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndDeadLetter() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndDeadLetter(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndRenewLock() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndRenewLock(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testSendSceduledMessageAndReceive() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.ReceiveAndDelete); + TestCommons.testSendSceduledMessageAndReceive(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testSendSceduledMessageAndCancel() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.ReceiveAndDelete); + TestCommons.testSendSceduledMessageAndCancel(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testPeekMessage() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testPeekMessage(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testPeekMessageBatch() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testReceiveBySequenceNumberAndComplete() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndComplete(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testReceiveBySequenceNumberAndAbandon() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndAbandon(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testReceiveBySequenceNumberAndDefer() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndDefer(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testReceiveBySequenceNumberAndDeadletter() throws InterruptedException, ServiceBusException, IOException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveBuilder.getEntityPath(), ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndDeadletter(this.sender, this.sessionId, this.receiver); + } + + private void drainAllMessages() throws IOException, InterruptedException, ServiceBusException + { + if(this.receiver != null) + { + TestCommons.drainAllMessagesFromReceiver(this.receiver); + } + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java new file mode 100644 index 0000000000000..16b3f9bb4e89d --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java @@ -0,0 +1,317 @@ +package com.microsoft.azure.servicebus; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.TimeoutException; + +public abstract class SessionTests { + private static ConnectionStringBuilder sendBuilder; + private static ConnectionStringBuilder receiveBuilder; + private static MessagingFactory factory; + private IMessageSender sender; + private IMessageSession session; + + @Before + public void setup() throws IOException, InterruptedException, ExecutionException, ServiceBusException + { + sendBuilder = this.getSenderConnectionStringBuilder(); + receiveBuilder = this.getReceiverConnectionStringBuilder(); + factory = MessagingFactory.createFromConnectionStringBuilder(sendBuilder); + this.sender = ClientFactory.createMessageSenderFromConnectionStringBuilder(sendBuilder); + } + + private static String getRandomString() + { + return UUID.randomUUID().toString(); + } + + @After + public void tearDown() throws ServiceBusException, InterruptedException, IOException + { + this.drainAllMessages(); + + this.sender.close(); + if(this.session != null) + this.session.close(); + + } + + @AfterClass + public static void afterClass() throws ServiceBusException, InterruptedException, IOException + { +// drainAllSessions(); + factory.close(); + } + + public abstract ConnectionStringBuilder getSenderConnectionStringBuilder(); + + public abstract ConnectionStringBuilder getReceiverConnectionStringBuilder(); + + @Test + public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.ReceiveAndDelete); + TestCommons.testBasicReceiveAndDelete(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.ReceiveAndDelete); + TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndComplete(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndAbandon() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndAbandon(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndDeadLetter() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndDeadLetter(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndRenewLock() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndRenewLock(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session); + } + + @Test + public void testSendSceduledMessageAndReceive() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.ReceiveAndDelete); + TestCommons.testSendSceduledMessageAndReceive(this.sender, sessionId, this.session); + } + + @Test + public void testSendSceduledMessageAndCancel() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.ReceiveAndDelete); + TestCommons.testSendSceduledMessageAndCancel(this.sender, sessionId, this.session); + } + + @Test + public void testPeekMessage() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testPeekMessage(this.sender, sessionId, this.session); + } + + @Test + public void testPeekMessageBatch() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session); + } + + @Test + public void testReceiveBySequenceNumberAndComplete() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndComplete(this.sender, sessionId, this.session); + } + + @Test + public void testReceiveBySequenceNumberAndAbandon() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndAbandon(this.sender, sessionId, this.session); + } + + @Test + public void testReceiveBySequenceNumberAndDefer() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndDefer(this.sender, sessionId, this.session); + } + + @Test + public void testReceiveBySequenceNumberAndDeadletter() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + TestCommons.testReceiveBySequenceNumberAndDeadletter(this.sender, sessionId, this.session); + } + + @Test + public void testAcceptAnySession() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + String messageId = getRandomString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + this.sender.send(message); + + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), null, ReceiveMode.PeekLock); + Assert.assertNotNull("Did not receive a session", this.session); + } + + @Test + public void testRenewSessionLock() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + Instant initialValidity = this.session.getLockedUntilUtc(); + this.session.renewLock(); + Instant renewedValidity = this.session.getLockedUntilUtc(); + Assert.assertTrue("RenewSessionLock did not renew session lockeduntil time.", renewedValidity.isAfter(initialValidity)); + this.session.renewLock(); + Instant renewedValidity2 = this.session.getLockedUntilUtc(); + Assert.assertTrue("RenewSessionLock did not renew session lockeduntil time.", renewedValidity2.isAfter(renewedValidity)); + } + + @Test + public void testGetAndSetState() throws InterruptedException, ServiceBusException, IOException + { + String sessionId = getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), sessionId, ReceiveMode.PeekLock); + byte[] initialState = this.session.getState(); + Assert.assertNull("Session state is not null for a new session", initialState); + byte[] customState = "Custom Session State".getBytes(); + this.session.setState(customState); + byte[] updatedState = this.session.getState(); + Assert.assertArrayEquals("Session state not updated properly", customState, updatedState); + this.session.setState(null); + updatedState = this.session.getState(); + Assert.assertNull("Session state is not removed by setting a null state", updatedState); + this.session.setState(customState); + updatedState = this.session.getState(); + Assert.assertArrayEquals("Session state not updated properly", customState, updatedState); + } + + @Test + public void testGetMessageSessions() throws InterruptedException, ServiceBusException, IOException + { + int defaultPageSize = 100; + int numSessions = 110; // More than default page size + String[] sessionIds = new String[numSessions]; + for(int i=0; i sessions = Utils.completeFuture(sessionBrowser.getMessageSessionsAsync()); + Assert.assertEquals("GetMessageSessions returned more than " + defaultPageSize + " sessions", defaultPageSize, sessions.size()); + Collection remainingSessions = Utils.completeFuture(sessionBrowser.getMessageSessionsAsync()); + Assert.assertTrue("GetMessageSessions didnot return all sessions", numSessions >= defaultPageSize); + + IMessageSession anySession = (IMessageSession)remainingSessions.toArray()[0]; + try{ + anySession.receive(); + Assert.fail("Browsable session should not support receive operation"); + } + catch(UnsupportedOperationException e) + { + // Expected + } + + try{ + anySession.setState(null); + Assert.fail("Browsable session should not support setstate operation"); + } + catch(UnsupportedOperationException e) + { + // Expected + } + + // shouldn't throw an exception + byte[] sessionState = anySession.getState(); + + IBrokeredMessage peekedMessage = anySession.peek(); + Assert.assertNotNull("Peek on a browsable session failed.", peekedMessage); + } + + private void drainAllMessages() throws IOException, InterruptedException, ServiceBusException + { + if(this.session != null) + { + TestCommons.drainAllMessagesFromReceiver(this.session); + } + } + + private static void drainAllSessions() throws InterruptedException, ServiceBusException, IOException + { + int count = 0; + while(true) + { + try + { + IMessageSession session = ClientFactory.acceptSessionFromEntityPath(factory, receiveBuilder.getEntityPath(), null, ReceiveMode.ReceiveAndDelete); + count++; + TestCommons.drainAllMessagesFromReceiver(session); + session.setState(null); + session.close(); + } + catch(TimeoutException te) + { + System.out.println("Breaking.. on count:" + count); + // Session not found + break; + } + } + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java new file mode 100644 index 0000000000000..2201b13605a97 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java @@ -0,0 +1,544 @@ +package com.microsoft.azure.servicebus; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import org.junit.Assert; + +import com.microsoft.azure.servicebus.primitives.MessageNotFoundException; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; + +public class TestCommons { + + private static Duration shortWaitTime = Duration.ofSeconds(5); + + public static void testBasicSend(IMessageSender sender) throws InterruptedException, ServiceBusException, IOException + { + sender.send(new BrokeredMessage("AMQP message")); + } + + public static void testBasicSendBatch(IMessageSender sender) throws InterruptedException, ServiceBusException, IOException + { + List messages = new ArrayList(); + for(int i=0; i<10; i++) + { + messages.add(new BrokeredMessage("AMQP message")); + } + sender.sendBatch(messages); + } + + public static void testBasicReceiveAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + receivedMessage = receiver.receive(shortWaitTime); + Assert.assertNull("Message received again", receivedMessage); + } + + public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + int numMessages = 10; + List messages = new ArrayList(); + for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages); + while(receivedMessages != null && receivedMessages.size() > 0) + { + totalReceivedMessages += receivedMessages.size(); + receivedMessages = receiver.receiveBatch(numMessages); + } + + Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages); + receivedMessages = receiver.receiveBatch(numMessages, shortWaitTime); + Assert.assertNull("Messages received again", receivedMessages); + } + + public static void testBasicReceiveAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + receiver.complete(receivedMessage.getLockToken()); + receivedMessage = receiver.receive(shortWaitTime); + Assert.assertNull("Message was not properly completed", receivedMessage); + } + + public static void testBasicReceiveAndAbandon(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + long deliveryCount = receivedMessage.getDeliveryCount(); + receiver.abandon(receivedMessage.getLockToken()); + receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("DeliveryCount not incremented", deliveryCount+1, receivedMessage.getDeliveryCount()); + receiver.complete(receivedMessage.getLockToken()); + } + + public static void testBasicReceiveAndDeadLetter(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + String deadLetterReason = "java client deadletter test"; + receiver.deadLetter(receivedMessage.getLockToken(), deadLetterReason, null); + receivedMessage = receiver.receive(shortWaitTime); + Assert.assertNull("Message was not properly deadlettered", receivedMessage); + } + + public static void testBasicReceiveAndRenewLock(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + BrokeredMessage message = new BrokeredMessage("AMQP message"); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + Instant oldLockedUntilTime = receivedMessage.getLockedUntilUtc(); + Thread.sleep(1000); + Instant newLockedUntilUtc = receiver.renewMessageLock(receivedMessage); + Assert.assertTrue("Lock not renewed. OldLockedUntilUtc:" + oldLockedUntilTime.toString() + ", newLockedUntilUtc:" + newLockedUntilUtc, newLockedUntilUtc.isAfter(oldLockedUntilTime)); + Assert.assertEquals("Renewed lockeduntil time not set in Message", newLockedUntilUtc, receivedMessage.getLockedUntilUtc()); + receiver.complete(receivedMessage.getLockToken()); + } + + public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + int numMessages = 10; + List messages = new ArrayList(); + for(int i=0; i totalReceivedMessages = new ArrayList<>(); + + Collection receivedMessages = receiver.receiveBatch(numMessages); + totalReceivedMessages.addAll(receivedMessages); + while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages) + { + receivedMessages = receiver.receiveBatch(numMessages); + totalReceivedMessages.addAll(receivedMessages); + } + Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size()); + + ArrayList oldLockTimes = new ArrayList(); + for(IBrokeredMessage message : totalReceivedMessages) + { + oldLockTimes.add(message.getLockedUntilUtc()); + } + + Thread.sleep(1000); + Collection newLockTimes = ((BrokeredMessageReceiver)receiver).renewMessageLockBatch(totalReceivedMessages); + Assert.assertEquals("RenewLock didn't return one instant per message in the collection", totalReceivedMessages.size(), newLockTimes.size()); + Iterator newLockTimeIterator = newLockTimes.iterator(); + Iterator oldLockTimeIterator = oldLockTimes.iterator(); + for(IBrokeredMessage message : totalReceivedMessages) + { + Instant oldLockTime = oldLockTimeIterator.next(); + Instant newLockTime = newLockTimeIterator.next(); + Assert.assertTrue("Lock not renewed. OldLockedUntilUtc:" + oldLockTime.toString() + ", newLockedUntilUtc:" + newLockTime.toString(), newLockTime.isAfter(oldLockTime)); + Assert.assertEquals("Renewed lockeduntil time not set in Message", newLockTime, message.getLockedUntilUtc()); + receiver.complete(message.getLockToken()); + } + } + + public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException, ExecutionException + { + int numMessages = 10; + List messages = new ArrayList(); + for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages); + while(receivedMessages != null && receivedMessages.size() > 0) + { + totalMessagesReceived += receivedMessages.size(); + for(IBrokeredMessage message : receivedMessages) + { + //System.out.println(message.getLockToken()); + receiver.complete(message.getLockToken()); + } + receivedMessages = receiver.receiveBatch(numMessages); + } + Assert.assertEquals("All messages not received", numMessages, totalMessagesReceived); + + receivedMessages = receiver.receiveBatch(numMessages, shortWaitTime); + Assert.assertNull("Messages received again", receivedMessages); + } + + public static void testSendSceduledMessageAndReceive(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + int secondsToWaitBeforeScheduling = 30; + String msgId1 = UUID.randomUUID().toString(); + String msgId2 = UUID.randomUUID().toString(); + BrokeredMessage message1 = new BrokeredMessage("AMQP Scheduled message"); + message1.setMessageId(msgId1); + BrokeredMessage message2 = new BrokeredMessage("AMQP Scheduled message2"); + message2.setMessageId(msgId2); + if(sessionId != null) + { + message1.setSessionId(sessionId); + message2.setSessionId(sessionId); + } + + sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); + sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + + Collection allReceivedMessages = new LinkedList(); + Collection receivedMessages = receiver.receiveBatch(10); + + while(receivedMessages != null && receivedMessages.size() > 0) + { + allReceivedMessages.addAll(receivedMessages); + receivedMessages = receiver.receiveBatch(10); + } + + boolean firstMessageReceived = false; + boolean secondMessageReceived = false; + for(IBrokeredMessage message : allReceivedMessages) + { + if(message.getMessageId().equals(msgId1)) + firstMessageReceived = true; + else if(message.getMessageId().equals(msgId2)) + secondMessageReceived = true; + } + + Assert.assertTrue("Scheduled messages not received", firstMessageReceived && secondMessageReceived); + } + + public static void testSendSceduledMessageAndCancel(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + int secondsToWaitBeforeScheduling = 30; + String msgId1 = UUID.randomUUID().toString(); + String msgId2 = UUID.randomUUID().toString(); + BrokeredMessage message1 = new BrokeredMessage("AMQP Scheduled message"); + BrokeredMessage message2 = new BrokeredMessage("AMQP Scheduled message2"); + message1.setMessageId(msgId1); + message2.setMessageId(msgId2); + if(sessionId != null) + { + message1.setSessionId(sessionId); + message2.setSessionId(sessionId); + } + + sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); + long sequnceNumberMsg2 = sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); + sender.cancelScheduledMessage(sequnceNumberMsg2); + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + + Collection allReceivedMessages = new LinkedList(); + Collection receivedMessages = receiver.receiveBatch(10); + while(receivedMessages != null && receivedMessages.size() > 0) + { + allReceivedMessages.addAll(receivedMessages); + receivedMessages = receiver.receiveBatch(10); + } + + Assert.assertTrue("Scheduled messages not received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId1))); + Assert.assertFalse("Cancelled scheduled messages also received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId2))); + } + + public static void testPeekMessage(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException, IOException + { + BrokeredMessage message = new BrokeredMessage("AMQP Scheduled message"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + message = new BrokeredMessage("AMQP Scheduled message2"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + Thread.sleep(5000); + IBrokeredMessage peekedMessage1 = browser.peek(); + long firstMessageSequenceNumber = peekedMessage1.getSequenceNumber(); + IBrokeredMessage peekedMessage2 = browser.peek(); + Assert.assertNotEquals("Peek returned the same message again.", firstMessageSequenceNumber, peekedMessage2.getSequenceNumber()); + + // Now peek with fromSequnceNumber.. May not work for partitioned entities + IBrokeredMessage peekedMessage5 = browser.peek(firstMessageSequenceNumber); + Assert.assertEquals("Peek with sequence number failed.", firstMessageSequenceNumber, peekedMessage5.getSequenceNumber()); + } + + public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException, IOException + { + BrokeredMessage message = new BrokeredMessage("AMQP Scheduled message"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + message = new BrokeredMessage("AMQP Scheduled message2"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + Thread.sleep(5000); + Collection peekedMessages = browser.peekBatch(10); + long firstMessageSequenceNumber = peekedMessages.iterator().next().getSequenceNumber(); + int peekedMessagesCount = peekedMessages.size(); + if(peekedMessagesCount < 2) + { + // Not all messages peeked. May be topic pump hasn't finished pumping all messages + peekedMessages = browser.peekBatch(10); + peekedMessagesCount += peekedMessages.size(); + } + Assert.assertEquals("PeekBatch didnot return all messages.", 2, peekedMessagesCount); + + // Now peek with fromSequnceNumber.. May not work for partitioned entities + Collection peekedMessagesBatch2 = browser.peekBatch(firstMessageSequenceNumber, 10); + Assert.assertEquals("PeekBatch with sequence number didnot return all messages.", 2, peekedMessagesBatch2.size()); + Assert.assertEquals("PeekBatch with sequence number failed.", firstMessageSequenceNumber, peekedMessagesBatch2.iterator().next().getSequenceNumber()); + } + + public static void testReceiveBySequenceNumberAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + BrokeredMessage message = new BrokeredMessage("AMQP Scheduled message"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + long sequenceNumber = receivedMessage.getSequenceNumber(); + String messageId = receivedMessage.getMessageId(); + receiver.defer(receivedMessage.getLockToken()); + + // Now receive by sequence number + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); + receiver.complete(receivedMessage.getLockToken()); + + // Try to receive by sequence number again + try + { + receivedMessage = receiver.receive(sequenceNumber); + Assert.fail("Message recieved by sequnce number was not properly completed."); + } + catch(MessageNotFoundException e) + { + // Expected + } + } + + public static void testReceiveBySequenceNumberAndAbandon(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + BrokeredMessage message = new BrokeredMessage("AMQP Scheduled message"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + long sequenceNumber = receivedMessage.getSequenceNumber(); + String messageId = receivedMessage.getMessageId(); + receiver.defer(receivedMessage.getLockToken()); + + // Now receive by sequence number + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); + long deliveryCount = receivedMessage.getDeliveryCount(); + receiver.abandon(receivedMessage.getLockToken()); + + // Try to receive by sequence number again + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("Abandon didn't increase the delivery count for the message received by sequence number.", deliveryCount + 1, receivedMessage.getDeliveryCount()); + receiver.complete(receivedMessage.getLockToken()); + } + + public static void testReceiveBySequenceNumberAndDefer(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + // Use longer strings with each defer to avoid an assert check in debug builds of service + String phaseKey = "phase"; + String initialPhase = "undeferred"; + String firstDeferredPhase = "deferred first time"; + String secondDeferredPhase = "deferred first time and second time"; + + BrokeredMessage sentMessage = new BrokeredMessage("AMQP message"); + HashMap customProperties = new HashMap(); + customProperties.put(phaseKey, initialPhase); + sentMessage.setProperties(customProperties); + if(sessionId != null) + { + sentMessage.setSessionId(sessionId); + } + sender.send(sentMessage); + + IBrokeredMessage receivedMessage = receiver.receive(); + long sequenceNumber = receivedMessage.getSequenceNumber(); + String messageId = receivedMessage.getMessageId(); + customProperties.put(phaseKey, firstDeferredPhase); + receiver.defer(receivedMessage.getLockToken(), customProperties); + + // Now receive by sequence number + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); + Assert.assertEquals("Defer didn't update properties of the message received by sequence number", firstDeferredPhase, receivedMessage.getProperties().get(phaseKey)); + customProperties.put(phaseKey, secondDeferredPhase); + receiver.defer(receivedMessage.getLockToken(), customProperties); + + // Try to receive by sequence number again + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message after deferrring", sequenceNumber, receivedMessage.getSequenceNumber()); + Assert.assertEquals("Defer didn't update properties of the message received by sequence number", secondDeferredPhase, receivedMessage.getProperties().get(phaseKey)); + receiver.complete(receivedMessage.getLockToken()); + } + + public static void testReceiveBySequenceNumberAndDeadletter(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, IOException + { + BrokeredMessage message = new BrokeredMessage("AMQP Scheduled message"); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IBrokeredMessage receivedMessage = receiver.receive(); + long sequenceNumber = receivedMessage.getSequenceNumber(); + String messageId = receivedMessage.getMessageId(); + receiver.defer(receivedMessage.getLockToken()); + + // Now receive by sequence number + receivedMessage = receiver.receive(sequenceNumber); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", sequenceNumber, receivedMessage.getSequenceNumber()); + Assert.assertEquals("ReceiveBySequenceNumber didn't receive the right message.", messageId, receivedMessage.getMessageId()); + String deadLetterReason = "java client deadletter test"; + receiver.deadLetter(receivedMessage.getLockToken(), deadLetterReason, null); + + // Try to receive by sequence number again + try + { + receivedMessage = receiver.receive(sequenceNumber); + Assert.fail("Message received by sequence number was not properly deadlettered"); + } + catch(MessageNotFoundException e) + { + // Expected + } + } + + public static void drainAllMessagesFromReceiver(IMessageReceiver receiver) throws InterruptedException, ServiceBusException + { + Duration waitTime = Duration.ofSeconds(5); + final int batchSize = 10; + Collection messages = receiver.receiveBatch(batchSize, waitTime); + while(messages !=null && messages.size() > 0) + { + if(receiver.getReceiveMode() == ReceiveMode.PeekLock) + { + for(IBrokeredMessage message: messages) + { + receiver.complete(message.getLockToken()); + } + } + messages = receiver.receiveBatch(batchSize, waitTime); + } + + IBrokeredMessage peekedMessage; + while((peekedMessage = receiver.peek()) != null) + { + try + { + IBrokeredMessage message = receiver.receive(peekedMessage.getSequenceNumber()); + if(receiver.getReceiveMode() == ReceiveMode.PeekLock) + { + receiver.complete(message.getLockToken()); + } + } + catch(MessageNotFoundException mnfe) + { + // Ignore. May be there were no deferred messages + break; + } + } + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java index 77c6f2acf6151..8b6a00190b163 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java @@ -3,17 +3,43 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.time.Duration; +import java.util.Collection; import java.util.Properties; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.TestConnectionStringBuilder; public class TestUtils { private static final String TEST_DIR_NAME = "resources"; private static final String ACCESS_PROPERTIES_FILE_NAME = "access.properties"; - private static final String NAMESPACENAME_PROPERTY = "namespacename"; - private static final String ENTITYPATH_PROPERTY = "entitypath"; - private static final String SHAREDACCESSKEYNAME_PROPERTY = "sharedaccesskeyname"; - private static final String SHAREDACCESSKEY_PROPERTY = "sharedaccesskey"; + + //Queue + private static final String QUEUE_NAMESPACENAME_PROPERTY = "queue.namespacename"; + private static final String QUEUE_ENTITYPATH_PROPERTY = "queue.entitypath"; + private static final String QUEUE_SHAREDACCESSKEYNAME_PROPERTY = "queue.sharedaccesskeyname"; + private static final String QUEUE_SHAREDACCESSKEY_PROPERTY = "queue.sharedaccesskey"; + + //Sessionful Queue + private static final String SESSIONFUL_QUEUE_NAMESPACENAME_PROPERTY = "queue.sessionful.namespacename"; + private static final String SESSIONFUL_QUEUE_ENTITYPATH_PROPERTY = "queue.sessionful.entitypath"; + private static final String SESSIONFUL_QUEUE_SHAREDACCESSKEYNAME_PROPERTY = "queue.sessionful.sharedaccesskeyname"; + private static final String SESSIONFUL_QUEUE_SHAREDACCESSKEY_PROPERTY = "queue.sessionful.sharedaccesskey"; + + //Topic and Subscription + private static final String TOPIC_NAMESPACENAME_PROPERTY = "topic.namespacename"; + private static final String TOPIC_ENTITYPATH_PROPERTY = "topic.entitypath"; + private static final String SUBSCRIPTION_ENTITYPATH_PROPERTY = "subscription.entitypath"; + private static final String TOPIC_SHAREDACCESSKEYNAME_PROPERTY = "topic.sharedaccesskeyname"; + private static final String TOPIC_SHAREDACCESSKEY_PROPERTY = "topic.sharedaccesskey"; + + //Sessionful Topic and Subscription + private static final String SESSIONFUL_TOPIC_NAMESPACENAME_PROPERTY = "topic.sessionful.namespacename"; + private static final String SESSIONFUL_TOPIC_ENTITYPATH_PROPERTY = "topic.sessionful.entitypath"; + private static final String SESSIONFUL_SUBSCRIPTION_ENTITYPATH_PROPERTY = "subscription.sessionful.entitypath"; + private static final String SESSIONFUL_TOPIC_SHAREDACCESSKEYNAME_PROPERTY = "topic.sessionful.sharedaccesskeyname"; + private static final String SESSIONFUL_TOPIC_SHAREDACCESSKEY_PROPERTY = "topic.sessionful.sharedaccesskey"; + private static Properties accessProperties; static @@ -31,28 +57,44 @@ public class TestUtils { } } - public static String getNamespace() + private static String getProperty(String propertyName) { - return accessProperties.getProperty(NAMESPACENAME_PROPERTY, ""); + return accessProperties.getProperty(propertyName, ""); + } + + public static ConnectionStringBuilder getQueueConnectionStringBuilder() + { + return new TestConnectionStringBuilder(getProperty(QUEUE_NAMESPACENAME_PROPERTY), getProperty(QUEUE_ENTITYPATH_PROPERTY), + getProperty(QUEUE_SHAREDACCESSKEYNAME_PROPERTY), getProperty(QUEUE_SHAREDACCESSKEY_PROPERTY)); } - public static String getEntityPath() + public static ConnectionStringBuilder getSessionfulQueueConnectionStringBuilder() { - return accessProperties.getProperty(ENTITYPATH_PROPERTY, ""); + return new TestConnectionStringBuilder(getProperty(SESSIONFUL_QUEUE_NAMESPACENAME_PROPERTY), getProperty(SESSIONFUL_QUEUE_ENTITYPATH_PROPERTY), + getProperty(SESSIONFUL_QUEUE_SHAREDACCESSKEYNAME_PROPERTY), getProperty(SESSIONFUL_QUEUE_SHAREDACCESSKEY_PROPERTY)); } - public static String getSharedAccessKeyName() + public static ConnectionStringBuilder getTopicConnectionStringBuilder() { - return accessProperties.getProperty(SHAREDACCESSKEYNAME_PROPERTY, ""); + return new TestConnectionStringBuilder(getProperty(TOPIC_NAMESPACENAME_PROPERTY), getProperty(TOPIC_ENTITYPATH_PROPERTY), + getProperty(TOPIC_SHAREDACCESSKEYNAME_PROPERTY), getProperty(TOPIC_SHAREDACCESSKEY_PROPERTY)); } - public static String getSharedAccessKey() + public static ConnectionStringBuilder getSessionfulTopicConnectionStringBuilder() { - return accessProperties.getProperty(SHAREDACCESSKEY_PROPERTY, ""); + return new TestConnectionStringBuilder(getProperty(SESSIONFUL_TOPIC_NAMESPACENAME_PROPERTY), getProperty(SESSIONFUL_TOPIC_ENTITYPATH_PROPERTY), + getProperty(SESSIONFUL_TOPIC_SHAREDACCESSKEYNAME_PROPERTY), getProperty(SESSIONFUL_TOPIC_SHAREDACCESSKEY_PROPERTY)); } - public static ConnectionStringBuilder getConnectionStringBuilder() + public static ConnectionStringBuilder getSubscriptionConnectionStringBuilder() { - return new ConnectionStringBuilder(getNamespace(), getEntityPath(), getSharedAccessKeyName(), getSharedAccessKey()); + return new TestConnectionStringBuilder(getProperty(TOPIC_NAMESPACENAME_PROPERTY), getProperty(TOPIC_ENTITYPATH_PROPERTY) + "/subscriptions/" + getProperty(SUBSCRIPTION_ENTITYPATH_PROPERTY), + getProperty(TOPIC_SHAREDACCESSKEYNAME_PROPERTY), getProperty(TOPIC_SHAREDACCESSKEY_PROPERTY)); } + + public static ConnectionStringBuilder getSessionfulSubscriptionConnectionStringBuilder() + { + return new TestConnectionStringBuilder(getProperty(SESSIONFUL_TOPIC_NAMESPACENAME_PROPERTY), getProperty(SESSIONFUL_TOPIC_ENTITYPATH_PROPERTY) + "/subscriptions/" + getProperty(SESSIONFUL_SUBSCRIPTION_ENTITYPATH_PROPERTY), + getProperty(SESSIONFUL_TOPIC_SHAREDACCESSKEYNAME_PROPERTY), getProperty(SESSIONFUL_TOPIC_SHAREDACCESSKEY_PROPERTY)); + } } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSendReceiveTests.java new file mode 100644 index 0000000000000..5ae09b95486d3 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSendReceiveTests.java @@ -0,0 +1,15 @@ +package com.microsoft.azure.servicebus; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; + +public class TopicSendReceiveTests extends SendReceiveTests { + @Override + public ConnectionStringBuilder getSenderConnectionStringBuilder() { + return TestUtils.getTopicConnectionStringBuilder(); + } + + @Override + public ConnectionStringBuilder getReceiverConnectionStringBuilder() { + return TestUtils.getSubscriptionConnectionStringBuilder(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSessionTests.java new file mode 100644 index 0000000000000..afbacf8e698c3 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TopicSessionTests.java @@ -0,0 +1,16 @@ +package com.microsoft.azure.servicebus; + +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; + +public class TopicSessionTests extends SessionTests { + + @Override + public ConnectionStringBuilder getSenderConnectionStringBuilder() { + return TestUtils.getSessionfulTopicConnectionStringBuilder(); + } + + @Override + public ConnectionStringBuilder getReceiverConnectionStringBuilder() { + return TestUtils.getSessionfulSubscriptionConnectionStringBuilder(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/TestConnectionStringBuilder.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/TestConnectionStringBuilder.java new file mode 100644 index 0000000000000..4710ff9f97429 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/TestConnectionStringBuilder.java @@ -0,0 +1,21 @@ +package com.microsoft.azure.servicebus.primitives; + +public class TestConnectionStringBuilder extends ConnectionStringBuilder +{ + final static String oneBoxEndpointFormat = "amqps://%s.servicebus.onebox.windows-int.net"; + + public TestConnectionStringBuilder( + final String namespaceName, + final String entityPath, + final String sharedAccessKeyName, + final String sharedAccessKey) + { + super(namespaceName, entityPath, sharedAccessKeyName, sharedAccessKey); + } + + @Override + String getEndPointFormat() + { + return oneBoxEndpointFormat; + } +}