From 75df395915dc480de373256e317dde97e8429afb Mon Sep 17 00:00:00 2001 From: Srikanta Date: Mon, 6 Jan 2020 17:56:38 -0800 Subject: [PATCH 1/4] Improve logging and update consumer close logic --- .../blob/BlobCheckpointStore.java | 97 +++++++++++-------- .../checkpointstore/blob/Messages.java | 4 +- .../eventhubs/PartitionBasedLoadBalancer.java | 6 +- .../eventhubs/PartitionPumpManager.java | 7 +- 4 files changed, 71 insertions(+), 43 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java index a4711d3633614..aed945ae82903 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.java @@ -41,9 +41,9 @@ */ public class BlobCheckpointStore implements CheckpointStore { - private static final String SEQUENCE_NUMBER = "SequenceNumber"; - private static final String OFFSET = "Offset"; - private static final String OWNER_ID = "OwnerId"; + private static final String SEQUENCE_NUMBER = "sequencenumber"; + private static final String OFFSET = "offset"; + private static final String OWNER_ID = "ownerid"; private static final String ETAG = "eTag"; private static final String BLOB_PATH_SEPARATOR = "/"; @@ -109,14 +109,27 @@ private Mono convertToCheckpoint(BlobItem blobItem) { } Map metadata = blobItem.getMetadata(); + logger.info(Messages.CHECKPOINT_INFO, blobItem.getName(), metadata.get(SEQUENCE_NUMBER), + metadata.get(OFFSET)); + + Long sequenceNumber = null; + Long offset = null; + if (!CoreUtils.isNullOrEmpty(metadata.get(SEQUENCE_NUMBER))) { + sequenceNumber = Long.parseLong(metadata.get(SEQUENCE_NUMBER)); + } + + if (!CoreUtils.isNullOrEmpty(metadata.get(OFFSET))) { + offset = Long.parseLong(metadata.get(OFFSET)); + } + Checkpoint checkpoint = new Checkpoint() .setFullyQualifiedNamespace(names[0]) .setEventHubName(names[1]) .setConsumerGroup(names[2]) // names[3] is "checkpoint" .setPartitionId(names[4]) - .setSequenceNumber(Long.parseLong(metadata.get(SEQUENCE_NUMBER))) - .setOffset(Long.parseLong(metadata.get(OFFSET))); + .setSequenceNumber(sequenceNumber) + .setOffset(offset); return Mono.just(checkpoint); } @@ -134,38 +147,44 @@ private Mono convertToCheckpoint(BlobItem blobItem) { public Flux claimOwnership(List requestedPartitionOwnerships) { return Flux.fromIterable(requestedPartitionOwnerships).flatMap(partitionOwnership -> { - String partitionId = partitionOwnership.getPartitionId(); - String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(), - partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId, - OWNERSHIP_PATH); - - if (!blobClients.containsKey(blobName)) { - blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName)); - } - - BlobAsyncClient blobAsyncClient = blobClients.get(blobName); - - Map metadata = new HashMap<>(); - metadata.put(OWNER_ID, partitionOwnership.getOwnerId()); - - BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); - if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) { - // New blob should be created - blobRequestConditions.setIfNoneMatch("*"); - return blobAsyncClient.getBlockBlobAsyncClient() - .uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, blobRequestConditions) - .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { - logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage()); - return Mono.empty(); - }, Mono::empty); - } else { - // update existing blob - blobRequestConditions.setIfMatch(partitionOwnership.getETag()); - return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions) - .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { - logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage()); - return Mono.empty(); - }, Mono::empty); + try { + String partitionId = partitionOwnership.getPartitionId(); + String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(), + partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId, + OWNERSHIP_PATH); + + if (!blobClients.containsKey(blobName)) { + blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName)); + } + + BlobAsyncClient blobAsyncClient = blobClients.get(blobName); + + Map metadata = new HashMap<>(); + metadata.put(OWNER_ID, partitionOwnership.getOwnerId()); + + BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); + if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) { + // New blob should be created + blobRequestConditions.setIfNoneMatch("*"); + return blobAsyncClient.getBlockBlobAsyncClient() + .uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, + blobRequestConditions) + .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { + logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage()); + return Mono.empty(); + }, Mono::empty); + } else { + // update existing blob + blobRequestConditions.setIfMatch(partitionOwnership.getETag()); + return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions) + .flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> { + logger.info(Messages.CLAIM_ERROR, partitionId, error); + return Mono.empty(); + }, Mono::empty); + } + } catch (Exception ex) { + logger.warning(Messages.CLAIM_ERROR, partitionOwnership.getPartitionId(), ex); + return Mono.empty(); } }); } @@ -237,6 +256,8 @@ private Mono convertToPartitionOwnership(BlobItem blobItem) logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName()); return Mono.empty(); } + logger + .info(Messages.BLOB_OWNER_INFO, blobItem.getName(), blobItem.getMetadata().getOrDefault(OWNER_ID, "")); BlobItemProperties blobProperties = blobItem.getProperties(); PartitionOwnership partitionOwnership = new PartitionOwnership() @@ -245,7 +266,7 @@ private Mono convertToPartitionOwnership(BlobItem blobItem) .setConsumerGroup(names[2]) // names[3] is "ownership" .setPartitionId(names[4]) - .setOwnerId(blobItem.getMetadata().get(OWNER_ID)) + .setOwnerId(blobItem.getMetadata().getOrDefault(OWNER_ID, "")) .setLastModifiedTime(blobProperties.getLastModified().toInstant().toEpochMilli()) .setETag(blobProperties.getETag()); return Mono.just(partitionOwnership); diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/Messages.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/Messages.java index 058f7800f967c..ddf6ac0793017 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/Messages.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/java/com/azure/messaging/eventhubs/checkpointstore/blob/Messages.java @@ -18,8 +18,10 @@ public enum Messages { private static Properties properties; private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties"; public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}"; - public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}, error {}"; + public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}"; public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}"; + public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}"; + public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}"; private static synchronized Properties getProperties() { if (properties != null) { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java index 6caee9db26a73..8026ae465bcb6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.java @@ -205,7 +205,8 @@ private Mono loadBalance(final Tuple2, Lis if (isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition, ownerPartitionMap)) { // If the partitions are evenly distributed among all active event processors, no change required. - logger.info("Load is balanced"); + logger.info("Load is balanced with this event processor owning {} partitions", + ownerPartitionMap.get(ownerId).size()); // renew ownership of already owned partitions checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet() .stream() @@ -231,7 +232,8 @@ private Mono loadBalance(final Tuple2, Lis // If we have reached this stage, this event processor has to claim/steal ownership of at least 1 // more partition logger.info( - "Load is unbalanced and this event processor should own more partitions"); + "Load is unbalanced and this event processor owns {} partitions and should own more partitions", + ownerPartitionMap.get(ownerId).size()); /* * If some partitions are unclaimed, this could be because an event processor is down and * it's partitions are now available for others to own or because event processors are just diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 6694fd6741f4c..c07a3935cf67f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -148,8 +148,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi partitionEvent), /* EventHubConsumer receive() returned an error */ ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext), - () -> partitionProcessor.close(new CloseContext(partitionContext, - CloseReason.EVENT_PROCESSOR_SHUTDOWN))); + () -> { + partitionProcessor.close(new CloseContext(partitionContext, + CloseReason.EVENT_PROCESSOR_SHUTDOWN)); + partitionPumps.remove(claimedOwnership.getPartitionId()); + }); } catch (Exception ex) { if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { cleanup(claimedOwnership, partitionPumps.get(claimedOwnership.getPartitionId())); From 12d4511e613574967db22b367f67e74b5ab3f21c Mon Sep 17 00:00:00 2001 From: Srikanta Date: Mon, 6 Jan 2020 18:12:03 -0800 Subject: [PATCH 2/4] Update localization text message --- .../eventhubs/checkpointstore/blob/messages.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties index 35583072b0b85..76dcc830fd7b6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/main/resources/com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties @@ -1,3 +1,5 @@ NO_METADATA_AVAILABLE_FOR_BLOB=No metadata available for blob {} -CLAIM_ERROR=Couldn't claim ownership of partition {}, error {} +CLAIM_ERROR=Couldn't claim ownership of partition {} FOUND_BLOB_FOR_PARTITION=Found blob for partition {} +BLOB_OWNER_INFO=Blob {} is owned by {} +CHECKPOINT_INFO=Blob {} has checkpoint with sequence number {} and offset {} From e521ca7b7b7bb87630280a3e41ec2eb3efd3bdc8 Mon Sep 17 00:00:00 2001 From: Srikanta Date: Tue, 7 Jan 2020 00:54:44 -0800 Subject: [PATCH 3/4] Fix unit test --- .../com/azure/messaging/eventhubs/PartitionPumpManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index c07a3935cf67f..77ee1126ef568 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -151,7 +151,7 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi () -> { partitionProcessor.close(new CloseContext(partitionContext, CloseReason.EVENT_PROCESSOR_SHUTDOWN)); - partitionPumps.remove(claimedOwnership.getPartitionId()); + cleanup(claimedOwnership, eventHubConsumer); }); } catch (Exception ex) { if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { From 305711b7f08dea861a4383f9846b55cd82dce5c7 Mon Sep 17 00:00:00 2001 From: Srikanta Date: Tue, 7 Jan 2020 03:24:11 -0800 Subject: [PATCH 4/4] Fix checkpointstore tests --- .../blob/BlobEventProcessorClientStoreTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java index 40df3b74448e8..be5eac43e4854 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/test/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobEventProcessorClientStoreTest.java @@ -210,9 +210,9 @@ private BlobItem getBlobItem(String owner, String sequenceNumber, String offset, private Map getMetadata(String owner, String sequenceNumber, String offset) { Map metadata = new HashMap<>(); - metadata.put("OwnerId", owner); - metadata.put("SequenceNumber", sequenceNumber); - metadata.put("Offset", offset); + metadata.put("ownerid", owner); + metadata.put("sequencenumber", sequenceNumber); + metadata.put("offset", offset); return metadata; } }