Skip to content

Commit

Permalink
QueueClient, TopicClient, SubscriptionClient for java client (#27)
Browse files Browse the repository at this point in the history
* Chaning directory structure to align maven standard directory structure. Also changes to pom.xml files to align Maven standard build process like running tests.

* Removing an unwanted file that was accidentally checked in.

* Implemented Peek and PeekBatch functionality. Added some tests.

* Implementation of ReceiveBySequenceNumber and Complete,Abandon,Defer,DeadLetter of messages received by sequence numbers.

* WIP Sessions support

* Full implementation of AcceptSession, receive features on sessions, renew lock, getState and set state.

* Implementation of browse sessions.

* Adding tests for topic-subscription cases

* Implemented TopicClient that support both SEND and BROWSE

* Added support for AddRule/RemoveRule on susbscription client. Also added skeletion clients.

* QueueClient and Subscription Client first part. Completed implementation of onMessage pump and tests.

* Implementation of message pump and session pump.

* Fixing minor issues in session pump.

* Renaming all Brokered* classes to just Senders, Receivers.

* Hiding ClientFactory methods

* Deleting BrokeredMessageSession.java as it was replaced by MessageSession.java
  • Loading branch information
yvgopal authored and jtaubensee committed Apr 4, 2017
1 parent 3bbba3a commit d3b107d
Show file tree
Hide file tree
Showing 64 changed files with 4,332 additions and 1,057 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.time.Duration;

import com.microsoft.azure.servicebus.ClientFactory;
import com.microsoft.azure.servicebus.IBrokeredMessage;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageReceiver;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
Expand Down Expand Up @@ -32,7 +32,7 @@ public static void main(String[] args) throws Exception {

private static void receiveMessages() throws InterruptedException, ServiceBusException {
while (true) {
IBrokeredMessage receivedMessage = receiver.receive(Duration.ofMinutes(1));
IMessage receivedMessage = receiver.receive(Duration.ofMinutes(1));
System.out.println(new String(receivedMessage.getContent()));
receiver.complete(receivedMessage.getLockToken());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.microsoft.azure.servicebus.samples;

import com.microsoft.azure.servicebus.BrokeredMessage;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.ClientFactory;
import com.microsoft.azure.servicebus.IMessageSender;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
Expand Down Expand Up @@ -31,7 +31,7 @@ public static void main(String[] args) throws Exception {
private static void sendMessages(int numMessages) throws InterruptedException, ServiceBusException {
for(int i = 0; i < numMessages; i ++) {
String messageBody = "MessageNumber: " + i;
BrokeredMessage message = new BrokeredMessage(messageBody.getBytes());
Message message = new Message(messageBody.getBytes());
sender.send(message);
System.out.println("Sending message " + i);
}
Expand Down
4 changes: 2 additions & 2 deletions azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
<artifactId>proton-j</artifactId>
<version>${proton-j-version}</version>
</dependency>
<!--<dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.49</version>
</dependency>-->
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,34 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import com.microsoft.azure.servicebus.primitives.MessageReceiver;
import com.microsoft.azure.servicebus.primitives.CoreMessageReceiver;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;

final class BrowsableMessageSession extends BrokeredMessageSession
final class BrowsableMessageSession extends MessageSession
{
private static final String INVALID_OPERATION_ERROR_MESSAGE = "Unsupported operation on a browse only session.";
private static final String INVALID_OPERATION_ERROR_MESSAGE = "Unsupported operation on a browse only session.";

private final String sessionId;
BrowsableMessageSession(String sessionId, MessagingFactory messagingFactory, String entityPath)
{
super(messagingFactory, entityPath, sessionId, ReceiveMode.PeekLock);
// try {
// this.initializeAsync().get();
// } catch (InterruptedException | ExecutionException e) {
//
// }
}

BrowsableMessageSession(String sessionId, MessagingFactory messagingFactory, MessageReceiver internalReceiver, String entityPath)
@Override
protected boolean isBrowsableSession()
{
super(messagingFactory, internalReceiver, entityPath, ReceiveMode.PeekLock);
this.sessionId = sessionId;
try {
this.initializeAsync().get();
} catch (InterruptedException | ExecutionException | IOException e) {
// We can ignore it, as init is a no-operation in this case
}
return true;
}

@Override
public String getSessionId()
{
return this.sessionId;
return this.getRequestedSessionId();
}

@Override
Expand All @@ -42,13 +45,13 @@ public Instant getLockedUntilUtc() {
}

@Override
public int getPrefetchCount()
public int getMessagePrefetchCount()
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public void setPrefetchCount(int prefetchCount) throws ServiceBusException
public void setMessagePrefetchCount(int prefetchCount) throws ServiceBusException
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}
Expand Down Expand Up @@ -81,7 +84,7 @@ public CompletableFuture<Void> completeAsync(UUID lockToken)
}

@Override
public CompletableFuture<Void> completeBatchAsync(Collection<? extends IBrokeredMessage> messages) {
public CompletableFuture<Void> completeBatchAsync(Collection<? extends IMessage> messages) {
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

Expand All @@ -98,49 +101,49 @@ public CompletableFuture<Void> deferAsync(UUID lockToken, Map<String, Object> pr
}

@Override
public CompletableFuture<IBrokeredMessage> receiveAsync()
public CompletableFuture<IMessage> receiveAsync()
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<IBrokeredMessage> receiveAsync(Duration serverWaitTime)
public CompletableFuture<IMessage> receiveAsync(Duration serverWaitTime)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<IBrokeredMessage> receiveAsync(long sequenceNumber)
public CompletableFuture<IMessage> receiveAsync(long sequenceNumber)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<Collection<IBrokeredMessage>> receiveBatchAsync(int maxMessageCount)
public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageCount)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<Collection<IBrokeredMessage>> receiveBatchAsync(int maxMessageCount, Duration serverWaitTime)
public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageCount, Duration serverWaitTime)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<Collection<IBrokeredMessage>> receiveBatchAsync(Collection<Long> sequenceNumbers)
public CompletableFuture<Collection<IMessage>> receiveBatchAsync(Collection<Long> sequenceNumbers)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<Instant> renewMessageLockAsync(IBrokeredMessage message)
public CompletableFuture<Instant> renewMessageLockAsync(IMessage message)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}

@Override
public CompletableFuture<Collection<Instant>> renewMessageLockBatchAsync(Collection<? extends IBrokeredMessage> messages)
public CompletableFuture<Collection<Instant>> renewMessageLockBatchAsync(Collection<? extends IMessage> messages)
{
throw new UnsupportedOperationException(INVALID_OPERATION_ERROR_MESSAGE);
}
Expand Down
Loading

0 comments on commit d3b107d

Please sign in to comment.