Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging and update consumer close logic #7175

Merged
merged 4 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
*/
public class BlobCheckpointStore implements CheckpointStore {

private static final String SEQUENCE_NUMBER = "SequenceNumber";
private static final String OFFSET = "Offset";
private static final String OWNER_ID = "OwnerId";
private static final String SEQUENCE_NUMBER = "sequencenumber";
private static final String OFFSET = "offset";
private static final String OWNER_ID = "ownerid";
private static final String ETAG = "eTag";

private static final String BLOB_PATH_SEPARATOR = "/";
Expand Down Expand Up @@ -109,14 +109,27 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
}

Map<String, String> metadata = blobItem.getMetadata();
logger.info(Messages.CHECKPOINT_INFO, blobItem.getName(), metadata.get(SEQUENCE_NUMBER),
metadata.get(OFFSET));

Long sequenceNumber = null;
Long offset = null;
if (!CoreUtils.isNullOrEmpty(metadata.get(SEQUENCE_NUMBER))) {
sequenceNumber = Long.parseLong(metadata.get(SEQUENCE_NUMBER));
}

if (!CoreUtils.isNullOrEmpty(metadata.get(OFFSET))) {
offset = Long.parseLong(metadata.get(OFFSET));
}

Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(names[0])
.setEventHubName(names[1])
.setConsumerGroup(names[2])
// names[3] is "checkpoint"
.setPartitionId(names[4])
.setSequenceNumber(Long.parseLong(metadata.get(SEQUENCE_NUMBER)))
.setOffset(Long.parseLong(metadata.get(OFFSET)));
.setSequenceNumber(sequenceNumber)
.setOffset(offset);

return Mono.just(checkpoint);
}
Expand All @@ -134,38 +147,44 @@ private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {

return Flux.fromIterable(requestedPartitionOwnerships).flatMap(partitionOwnership -> {
String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId,
OWNERSHIP_PATH);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Map<String, String> metadata = new HashMap<>();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());

BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobRequestConditions.setIfNoneMatch("*");
return blobAsyncClient.getBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobRequestConditions.setIfMatch(partitionOwnership.getETag());
return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
try {
String partitionId = partitionOwnership.getPartitionId();
String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId,
OWNERSHIP_PATH);

if (!blobClients.containsKey(blobName)) {
blobClients.put(blobName, blobContainerAsyncClient.getBlobAsyncClient(blobName));
}

BlobAsyncClient blobAsyncClient = blobClients.get(blobName);

Map<String, String> metadata = new HashMap<>();
metadata.put(OWNER_ID, partitionOwnership.getOwnerId());

BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) {
// New blob should be created
blobRequestConditions.setIfNoneMatch("*");
return blobAsyncClient.getBlockBlobAsyncClient()
.uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null, metadata, null, null,
blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error.getMessage());
return Mono.empty();
}, Mono::empty);
} else {
// update existing blob
blobRequestConditions.setIfMatch(partitionOwnership.getETag());
return blobAsyncClient.setMetadataWithResponse(metadata, blobRequestConditions)
.flatMapMany(response -> updateOwnershipETag(response, partitionOwnership), error -> {
logger.info(Messages.CLAIM_ERROR, partitionId, error);
return Mono.empty();
}, Mono::empty);
}
} catch (Exception ex) {
logger.warning(Messages.CLAIM_ERROR, partitionOwnership.getPartitionId(), ex);
return Mono.empty();
}
});
}
Expand Down Expand Up @@ -237,6 +256,8 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
logger.warning(Messages.NO_METADATA_AVAILABLE_FOR_BLOB, blobItem.getName());
return Mono.empty();
}
logger
.info(Messages.BLOB_OWNER_INFO, blobItem.getName(), blobItem.getMetadata().getOrDefault(OWNER_ID, ""));

BlobItemProperties blobProperties = blobItem.getProperties();
PartitionOwnership partitionOwnership = new PartitionOwnership()
Expand All @@ -245,7 +266,7 @@ private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem)
.setConsumerGroup(names[2])
// names[3] is "ownership"
.setPartitionId(names[4])
.setOwnerId(blobItem.getMetadata().get(OWNER_ID))
.setOwnerId(blobItem.getMetadata().getOrDefault(OWNER_ID, ""))
.setLastModifiedTime(blobProperties.getLastModified().toInstant().toEpochMilli())
.setETag(blobProperties.getETag());
return Mono.just(partitionOwnership);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ public enum Messages {
private static Properties properties;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - updated!

private static final String PATH = "com/azure/messaging/eventhubs/checkpointstore/blob/messages.properties";
public static final String NO_METADATA_AVAILABLE_FOR_BLOB = "No metadata available for blob {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}, error {}";
public static final String CLAIM_ERROR = "Couldn't claim ownership of partition {}";
public static final String FOUND_BLOB_FOR_PARTITION = "Found blob for partition {}";
public static final String BLOB_OWNER_INFO = "Blob {} is owned by {}";
public static final String CHECKPOINT_INFO = "Blob {} has checkpoint with sequence number {} and offset {}";

private static synchronized Properties getProperties() {
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
if (isLoadBalanced(minPartitionsPerEventProcessor, numberOfEventProcessorsWithAdditionalPartition,
ownerPartitionMap)) {
// If the partitions are evenly distributed among all active event processors, no change required.
logger.info("Load is balanced");
logger.info("Load is balanced with this event processor owning {} partitions",
ownerPartitionMap.get(ownerId).size());
// renew ownership of already owned partitions
checkpointStore.claimOwnership(partitionPumpManager.getPartitionPumps().keySet()
.stream()
Expand All @@ -231,7 +232,8 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
// If we have reached this stage, this event processor has to claim/steal ownership of at least 1
// more partition
logger.info(
"Load is unbalanced and this event processor should own more partitions");
"Load is unbalanced and this event processor owns {} partitions and should own more partitions",
ownerPartitionMap.get(ownerId).size());
/*
* If some partitions are unclaimed, this could be because an event processor is down and
* it's partitions are now available for others to own or because event processors are just
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionEvent),
/* EventHubConsumer receive() returned an error */
ex -> handleError(claimedOwnership, eventHubConsumer, partitionProcessor, ex, partitionContext),
() -> partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN)));
() -> {
partitionProcessor.close(new CloseContext(partitionContext,
CloseReason.EVENT_PROCESSOR_SHUTDOWN));
partitionPumps.remove(claimedOwnership.getPartitionId());
});
} catch (Exception ex) {
if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) {
cleanup(claimedOwnership, partitionPumps.get(claimedOwnership.getPartitionId()));
Expand Down