Skip to content

Commit

Permalink
EventHubs integration part 1 (Azure#3100)
Browse files Browse the repository at this point in the history
* eventhubs-java: pom and code changes

* eventhubs-java: client.test.live.yml updated

* LicenseJava: add license info for all java files

* PR-3067: Eventhub-java: integrate event hubs into azure-sdk-for-java repo [Follow up]

* fix: update jproxy link for unit tests

* fix<test-proxy-server>: add external proxy server lib source file for proxy tests

* fix<checkstyle>: fixed all indentation errors

* fix<checkstyle>: FileTabCharacter rule fixed

* fix<checkstyle>: fixed OperatorWrap rule

* fix<checkStyle>: fixed AvoidStarImport rule

* fix<checkstyle>: WhitespaceAround rule

* fix<checkstyle>: NoWhitespaceBefore and WhitespaceAfter rules

* fix<checkstyle>: WhitespaceAround rule

* fix<checkstyle>: RegexpSingleline rule

* fix<checkstyle>: ArrayTypeStyle rule

* fix<checkstyle>: NewlineAtEndOfFile rule

* fix<checkstyle>: UnusedImports

* fix<checkstyle>: ConstantName

* fix<checkstyle>: MethodName and StaticVariableName rules

* fix<checkstyle>: VisibilityModifier

* fix<checkstyle>: EmptyBlock, InnerAssignment

* fix<checkstyle>: ModifierOrder

* fix<checkstyle>: RedundantModifier

* fix<checkstyle-warning>: LeftCurly

* fix<revert>: revert client.test.live.yml changes, exclude event hub

* fix<checkstyle>: additional 10 Indentation errors

* fix<checkstyle, pom>: updates changes for connie's code changes request

* test<live-test>: enable live test for eventhubs

* fix(CheckStyle): update to latest checkstyle-suppressions that matched spotbugs
  • Loading branch information
mssfang authored and sima-zhu committed Mar 21, 2019
1 parent 3e2cf7a commit 34f1e57
Show file tree
Hide file tree
Showing 61 changed files with 907 additions and 796 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@

<!-- Public API already released with incorrect constant variable naming -->
<suppress checks="ConstantName" files="AlgorithmResolver.java"/>
<suppress checks="ConstantName" files="AmqpErrorCode.java"/>
<suppress checks="ConstantName" files="BatchErrorCodeStrings.java"/>
<suppress checks="ConstantName" files="MessagingFactory.java"/>
<suppress checks="ConstantName" files="PartitionManagerOptions.java"/>
<suppress checks="ConstantName" files="RsaKey.java"/>
<suppress checks="ConstantName" files="SymmetricKey.java"/>
<suppress checks="ConstantName" files="BatchErrorCodeStrings.java"/>
<suppress checks="ConstantName" files="TaskFailureInformationCodes.java"/>

<!-- Public API already released with incorrect static variable naming -->
<suppress checks="StaticVariableName" files="EventHubClientImpl.java"/>

<!-- Public API already released with visibility modifier -->
<suppress checks="VisibilityModifier" files="BatchOptions.java"/>
<suppress checks="VisibilityModifier" files="EventHubClientImpl.java"/>

<!-- Public API already released without final modifier -->
<suppress checks="FinalClass" files="BatchClient.java"/>
</suppressions>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class BaseLease implements Comparable<BaseLease> {
private final String partitionId;
private String owner = "";
private transient boolean isOwned = false; // do not serialize
private transient boolean isOwned = false; // do not serialize

/**
* Do not use; added only for GSon deserializer
Expand All @@ -45,9 +45,9 @@ public BaseLease(String partitionId) {
* @param isOwned True if the lease is owned, false if not.
*/
public BaseLease(String partitionId, String owner, boolean isOwned) {
this.partitionId = partitionId;
this.owner = owner;
this.isOwned = isOwned;
this.partitionId = partitionId;
this.owner = owner;
this.isOwned = isOwned;
}

/**
Expand Down Expand Up @@ -85,17 +85,17 @@ public void setOwner(String owner) {
* @param newState true if the lease is owned, or false if it is not
*/
public void setIsOwned(boolean newState) {
this.isOwned = newState;
this.isOwned = newState;
}

/**
* Get the owned state of the lease.
*
* @return true if the lease is owned, or false if it is not
*/
public boolean getIsOwned() {
return this.isOwned;
}
/**
* Get the owned state of the lease.
*
* @return true if the lease is owned, or false if it is not
*/
public boolean getIsOwned() {
return this.isOwned;
}

/**
* Convenience function for comparing possibleOwner against this.owner
Expand All @@ -120,11 +120,11 @@ public String getPartitionId() {
return this.partitionId;
}

// Compares by partition id
@Override
public int compareTo(BaseLease other) {
return this.partitionId.compareTo(other.getPartitionId());
}
// Compares by partition id
@Override
public int compareTo(BaseLease other) {
return this.partitionId.compareTo(other.getPartitionId());
}

String getStateDebug() {
return "N/A";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ class Closable {

// null parent means top-level
Closable(Closable parent) {
this.syncClose = new Object();
this.parent = parent;
this.isClosing = false;
this.isClosed = false;
this.syncClose = new Object();
this.parent = parent;
this.isClosing = false;
this.isClosed = false;
}

protected final boolean getIsClosed() {
Expand All @@ -33,29 +33,29 @@ protected final boolean getIsClosingOrClosed() {
}

protected final void setClosing() {
synchronized (this.syncClose) {
this.isClosing = true;
}
synchronized (this.syncClose) {
this.isClosing = true;
}
}

protected final void setClosed() {
synchronized (this.syncClose) {
this.isClosing = false;
this.isClosed = true;
}
synchronized (this.syncClose) {
this.isClosing = false;
this.isClosed = true;
}
}

protected final void throwIfClosingOrClosed(String message) {
if (getIsClosingOrClosed()) {
throw new ClosingException(message);
}
if (getIsClosingOrClosed()) {
throw new ClosingException(message);
}
}

class ClosingException extends RuntimeException {
private static final long serialVersionUID = 1138985585921317036L;
private static final long serialVersionUID = 1138985585921317036L;

ClosingException(String message) {
super(message);
}
ClosingException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class CompleteLease extends BaseLease {
* Do not use; added only for GSon deserializer
*/
protected CompleteLease() {
super();
super();
}

/**
Expand All @@ -31,7 +31,7 @@ protected CompleteLease() {
* @param partitionId Partition id for this lease.
*/
public CompleteLease(String partitionId) {
super(partitionId);
super(partitionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/***
Expand Down Expand Up @@ -500,8 +507,7 @@ public CompletableFuture<Void> unregisterEventProcessor() {
// If we own the executor, stop it also.
// Owned executor is also created in constructor.
if (this.weOwnExecutor) {
this.unregistered = this.unregistered.thenRunAsync(() ->
{
this.unregistered = this.unregistered.thenRunAsync(() -> {
// IMPORTANT: run this last stage in the default threadpool!
// If a task running in a threadpool waits for that threadpool to terminate, it's going to wait a long time...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,27 @@
* They describe what activity was taking place when the exception occurred.
*/
public final class EventProcessorHostActionStrings {
public final static String ACQUIRING_LEASE = "Acquiring Lease";
public final static String CHECKING_CHECKPOINT_STORE = "Checking Checpoint Store Existence";
public final static String CHECKING_LEASES = "Checking Leases";
public final static String CHECKING_LEASE_STORE = "Checking Lease Store Existence";
public final static String CLOSING_EVENT_PROCESSOR = "Closing Event Processor";
public final static String CREATING_CHECKPOINTS = "Creating Checkpoint Holders";
public final static String CREATING_CHECKPOINT_STORE = "Creating Checkpoint Store";
public final static String CREATING_EVENT_HUB_CLIENT = "Creating Event Hub Client";
public final static String CREATING_EVENT_PROCESSOR = "Creating Event Processor";
public final static String CREATING_LEASES = "Creating Leases";
public final static String CREATING_LEASE_STORE = "Creating Lease Store";
public final static String DELETING_LEASE = "Deleting Lease";
public final static String GETTING_CHECKPOINT = "Getting Checkpoint Details";
public final static String GETTING_LEASE = "Getting Lease Details";
public final static String INITIALIZING_STORES = "Initializing Stores";
public final static String OPENING_EVENT_PROCESSOR = "Opening Event Processor";
public final static String PARTITION_MANAGER_CLEANUP = "Partition Manager Cleanup";
public final static String PARTITION_MANAGER_MAIN_LOOP = "Partition Manager Main Loop";
public final static String RELEASING_LEASE = "Releasing Lease";
public final static String RENEWING_LEASE = "Renewing Lease";
public final static String STEALING_LEASE = "Stealing Lease";
public final static String UPDATING_CHECKPOINT = "Updating Checkpoint";
public final static String UPDATING_LEASE = "Updating Lease";
public static final String ACQUIRING_LEASE = "Acquiring Lease";
public static final String CHECKING_CHECKPOINT_STORE = "Checking Checpoint Store Existence";
public static final String CHECKING_LEASES = "Checking Leases";
public static final String CHECKING_LEASE_STORE = "Checking Lease Store Existence";
public static final String CLOSING_EVENT_PROCESSOR = "Closing Event Processor";
public static final String CREATING_CHECKPOINTS = "Creating Checkpoint Holders";
public static final String CREATING_CHECKPOINT_STORE = "Creating Checkpoint Store";
public static final String CREATING_EVENT_HUB_CLIENT = "Creating Event Hub Client";
public static final String CREATING_EVENT_PROCESSOR = "Creating Event Processor";
public static final String CREATING_LEASES = "Creating Leases";
public static final String CREATING_LEASE_STORE = "Creating Lease Store";
public static final String DELETING_LEASE = "Deleting Lease";
public static final String GETTING_CHECKPOINT = "Getting Checkpoint Details";
public static final String GETTING_LEASE = "Getting Lease Details";
public static final String INITIALIZING_STORES = "Initializing Stores";
public static final String OPENING_EVENT_PROCESSOR = "Opening Event Processor";
public static final String PARTITION_MANAGER_CLEANUP = "Partition Manager Cleanup";
public static final String PARTITION_MANAGER_MAIN_LOOP = "Partition Manager Main Loop";
public static final String RELEASING_LEASE = "Releasing Lease";
public static final String RENEWING_LEASE = "Renewing Lease";
public static final String STEALING_LEASE = "Stealing Lease";
public static final String UPDATING_CHECKPOINT = "Updating Checkpoint";
public static final String UPDATING_LEASE = "Updating Lease";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
import java.util.concurrent.ScheduledExecutorService;

final class HostContext {
final private ScheduledExecutorService executor;
private final ScheduledExecutorService executor;

// Ideally we wouldn't need the host, but there are certain things which can be dynamically changed
// by the user via APIs on the host and which need to be exposed on the HostContext. Passing the
// call through is easier and safer than trying to keep two copies in sync.
final private EventProcessorHost host;
final private String hostName;
private final EventProcessorHost host;
private final String hostName;

final private String eventHubPath;
final private String consumerGroupName;
final private String eventHubConnectionString;
final private RetryPolicy retryPolicy;
private final String eventHubPath;
private final String consumerGroupName;
private final String eventHubConnectionString;
private final RetryPolicy retryPolicy;

final private ILeaseManager leaseManager;
final private ICheckpointManager checkpointManager;
private final ILeaseManager leaseManager;
private final ICheckpointManager checkpointManager;

// Cannot be final because it is not available at HostContext construction time.
private EventProcessorOptions eventProcessorOptions = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public interface ICheckpointManager {
public CompletableFuture<Checkpoint> getCheckpoint(String partitionId);

/***
* Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs
* that already exist.
* Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs
* that already exist.
*
* The semantics of this are complicated because it is possible to use the same store for both
* leases and checkpoints (the Azure Storage implementation does so) and it is required to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {

@Override
public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds) {
for (String id : partitionIds) {
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id);
if (checkpointInStore != null) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() found existing checkpoint, OK"));
} else {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() creating new checkpoint"));
Checkpoint newStoreCheckpoint = new Checkpoint(id);
// This API actually creates the holder, not the checkpoint itself. In this implementation, we do create a Checkpoint object
// and put it in the store, but the values are set to indicate that it is not initialized.
newStoreCheckpoint.setOffset(null);
newStoreCheckpoint.setSequenceNumber(-1);
InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint);
}
}
for (String id : partitionIds) {
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id);
if (checkpointInStore != null) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() found existing checkpoint, OK"));
} else {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() creating new checkpoint"));
Checkpoint newStoreCheckpoint = new Checkpoint(id);
// This API actually creates the holder, not the checkpoint itself. In this implementation, we do create a Checkpoint object
// and put it in the store, but the values are set to indicate that it is not initialized.
newStoreCheckpoint.setOffset(null);
newStoreCheckpoint.setSequenceNumber(-1);
InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint);
}
}
return CompletableFuture.completedFuture(null);
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public CompletableFuture<Void> deleteCheckpoint(String partitionId) {


private static class InMemoryCheckpointStore {
final static InMemoryCheckpointStore singleton = new InMemoryCheckpointStore();
static final InMemoryCheckpointStore singleton = new InMemoryCheckpointStore();

private ConcurrentHashMap<String, Checkpoint> inMemoryCheckpointsPrivate = null;

Expand Down
Loading

0 comments on commit 34f1e57

Please sign in to comment.