Skip to content

Commit

Permalink
PrepareMergeSupportForChangeFeedProcessor (#31428)
Browse files Browse the repository at this point in the history
* Refactor changeFeedProcessor to prepare for merge support

Co-authored-by: annie-mac <[email protected]>
Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2022
1 parent 8a479e0 commit 6c7f6f6
Show file tree
Hide file tree
Showing 70 changed files with 2,171 additions and 1,322 deletions.
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.rx=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.rx=com.fasterxml.jackson.databind
--add-opens com.azure.cosmos/com.azure.cosmos.rx.proxy=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.rx.changefeed=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.rx.changefeed.pkversion=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.rx.changefeed.epkversion=ALL-UNNAMED

--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.batch=ALL-UNNAMED
Expand All @@ -64,6 +67,9 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.throughputControl=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.throughputControl.controller=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.throughputControl.controller.request=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.changefeed=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.changefeed.pkversion=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.changefeed.epkversion=ALL-UNNAMED
</javaModulesSurefireArgLine>

<!-- Prevents Checkstyle validating implementation files. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// Licensed under the MIT License.
package com.azure.cosmos;

import com.azure.cosmos.implementation.changefeed.incremental.ChangeFeedProcessorBuilderImpl;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.changefeed.pkversion.IncrementalChangeFeedProcessorImpl;
import com.azure.cosmos.implementation.changefeed.epkversion.FullFidelityChangeFeedProcessorImpl;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.util.Beta;
Expand All @@ -12,6 +15,9 @@
import java.util.List;
import java.util.function.Consumer;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* Helper class to build a {@link ChangeFeedProcessor} instance.
*
Expand Down Expand Up @@ -54,9 +60,10 @@ public class ChangeFeedProcessorBuilder {
private CosmosAsyncContainer feedContainer;
private CosmosAsyncContainer leaseContainer;
private ChangeFeedProcessorOptions changeFeedProcessorOptions;
private Consumer<List<JsonNode>> partitionKeyBasedLeaseConsumer;
private Consumer<List<ChangeFeedProcessorItem>> epkRangeBasedLeaseConsumer;
private Consumer<List<JsonNode>> incrementalModeLeaseConsumer;
private Consumer<List<ChangeFeedProcessorItem>> fullFidelityModeLeaseConsumer;
private ChangeFeedMode changeFeedMode = ChangeFeedMode.INCREMENTAL;
private LeaseVersion leaseVersion = LeaseVersion.PARTITION_KEY_BASED_LEASE;

/**
* Instantiates a new Cosmos a new ChangeFeedProcessor builder.
Expand Down Expand Up @@ -117,8 +124,12 @@ public ChangeFeedProcessorBuilder leaseContainer(CosmosAsyncContainer leaseConta
* @return current Builder.
*/
public ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer) {
this.partitionKeyBasedLeaseConsumer = consumer;
checkNotNull(consumer, "Argument 'consumer' can not be null");
checkArgument(this.incrementalModeLeaseConsumer == null, "consumer has already been defined");

this.incrementalModeLeaseConsumer = consumer;
this.changeFeedMode = ChangeFeedMode.INCREMENTAL;
this.leaseVersion = LeaseVersion.PARTITION_KEY_BASED_LEASE;
return this;
}

Expand All @@ -140,8 +151,9 @@ public ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consume
*/
@Beta(value = Beta.SinceVersion.V4_37_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer) {
this.epkRangeBasedLeaseConsumer = consumer;
this.fullFidelityModeLeaseConsumer = consumer;
this.changeFeedMode = ChangeFeedMode.FULL_FIDELITY;
this.leaseVersion = LeaseVersion.EPK_RANGE_BASED_LEASE;
return this;
}

Expand Down Expand Up @@ -174,44 +186,50 @@ public ChangeFeedProcessorBuilder options(ChangeFeedProcessorOptions changeFeedP
public ChangeFeedProcessor buildChangeFeedProcessor() {
validateChangeFeedProcessorBuilder();

if (ChangeFeedMode.INCREMENTAL.equals(changeFeedMode)) {
ChangeFeedProcessorBuilderImpl builder = new ChangeFeedProcessorBuilderImpl()
.hostName(this.hostName)
.feedContainer(this.feedContainer)
.leaseContainer(this.leaseContainer)
.handleChanges(this.partitionKeyBasedLeaseConsumer);

if (this.changeFeedProcessorOptions != null) {
builder.options(this.changeFeedProcessorOptions);
ChangeFeedProcessor changeFeedProcessor = null;
// Lease version will decide which version of the changeFeed processor we are going to use internally
// PARTITION_KEY_BASED_LEASE -> ChangeFeedProcessor pkversion
// EPK_RANGE_BASED_LEASE -> ChangeFeedProcessor epkversion
if (this.leaseVersion == LeaseVersion.EPK_RANGE_BASED_LEASE) {
switch (this.changeFeedMode) {
case FULL_FIDELITY:
changeFeedProcessor = new FullFidelityChangeFeedProcessorImpl(
this.hostName,
this.feedContainer,
this.leaseContainer,
this.fullFidelityModeLeaseConsumer,
this.changeFeedProcessorOptions);
break;
case INCREMENTAL:
changeFeedProcessor = new com.azure.cosmos.implementation.changefeed.epkversion.IncrementalChangeFeedProcessorImpl(
this.hostName,
this.feedContainer,
this.leaseContainer,
this.incrementalModeLeaseConsumer,
this.changeFeedProcessorOptions);
break;
default:
throw new IllegalStateException("ChangeFeed mode " + this.changeFeedMode + " is not supported");
}

return builder.build();
} else {
com.azure.cosmos.implementation.changefeed.fullfidelity.ChangeFeedProcessorBuilderImpl builder =
new com.azure.cosmos.implementation.changefeed.fullfidelity.ChangeFeedProcessorBuilderImpl()
.hostName(this.hostName)
.feedContainer(this.feedContainer)
.leaseContainer(this.leaseContainer)
.handleChanges(this.epkRangeBasedLeaseConsumer);
if (this.changeFeedProcessorOptions != null) {
builder.options(this.changeFeedProcessorOptions);
}
return builder.build();
changeFeedProcessor = new IncrementalChangeFeedProcessorImpl(
this.hostName,
this.feedContainer,
this.leaseContainer,
this.incrementalModeLeaseConsumer,
this.changeFeedProcessorOptions);
}

return changeFeedProcessor;
}

private void validateChangeFeedProcessorBuilder() {
if (hostName == null || hostName.isEmpty()) {
throw new IllegalArgumentException("hostName cannot be null or empty");
}
if (feedContainer == null) {
throw new IllegalArgumentException("feedContainer cannot be null");
}
if (leaseContainer == null) {
throw new IllegalArgumentException("leaseContainer cannot be null");
}
if ((partitionKeyBasedLeaseConsumer == null && epkRangeBasedLeaseConsumer == null)
|| (partitionKeyBasedLeaseConsumer != null && epkRangeBasedLeaseConsumer != null)) {
checkArgument(StringUtils.isNotEmpty(hostName), "hostName cannot be null or empty");
checkNotNull(feedContainer, "Argument 'feedContainer' can not be null");
checkNotNull(leaseContainer, "Argument 'leaseContainer' can not be null");

if ((incrementalModeLeaseConsumer == null && fullFidelityModeLeaseConsumer == null)
|| (incrementalModeLeaseConsumer != null && fullFidelityModeLeaseConsumer != null)) {
throw new IllegalArgumentException("expecting either LatestVersion or AllVersionsAndDeletes consumer for handling change feed processor changes");
}
validateChangeFeedProcessorOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
Expand All @@ -22,6 +23,7 @@
import reactor.core.scheduler.Scheduler;

import java.net.URI;
import java.util.List;

/**
* The interface that captures the APIs required to handle change feed processing logic.
Expand Down Expand Up @@ -161,4 +163,13 @@ <T> Mono<CosmosItemResponse<T>> readItem(String itemId, PartitionKey partitionKe
* @param scheduler a {@link Scheduler} that hosts a pool of ExecutorService-based workers.
*/
void setScheduler(Scheduler scheduler);

/**
* Get the overlapping partition key ranges.
*
* @param range the range.
*
* @return The list of partition key ranges.
*/
Mono<List<PartitionKeyRange>> getOverlappingRanges(Range<String> range);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.implementation.changefeed;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.LeaseVersion;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
Expand Down Expand Up @@ -77,11 +78,7 @@ public interface Lease {
*/
String getTimestamp();

ChangeFeedState getPartitionKeyBasedContinuationState(
String containerRid,
FeedRangeInternal feedRange);

ChangeFeedState getEpkRangeBasedContinuationState(String containerRid);
ChangeFeedState getContinuationState(String containerRid, ChangeFeedMode changeFeedMode);

/**
* Gets the continuation token used to determine the last processed point of the Change Feed.
Expand Down Expand Up @@ -169,7 +166,7 @@ ChangeFeedState getPartitionKeyBasedContinuationState(
*
* @param properties the custom lease item.
*/
void setProperties(Map<String,String> properties);
void setProperties(Map<String, String> properties);

/**
* Sets the lease properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface LeaseStoreManagerBuilderDefinition {

LeaseStoreManagerBuilderDefinition hostName(String hostName);

Mono<LeaseStoreManager> build();
LeaseStoreManager build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.Document;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
Expand All @@ -23,21 +24,27 @@
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static com.azure.cosmos.CosmosBridgeInternal.getContextClient;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* Implementation for ChangeFeedDocumentClient.
*/
public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
private static final Logger logger = LoggerFactory.getLogger(ChangeFeedContextClientImpl.class);

private final AsyncDocumentClient documentClient;
private final CosmosAsyncContainer cosmosContainer;
private Scheduler scheduler;
Expand All @@ -47,13 +54,7 @@ public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
* @param cosmosContainer existing client.
*/
public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer) {
if (cosmosContainer == null) {
throw new IllegalArgumentException("cosmosContainer");
}

this.cosmosContainer = cosmosContainer;
this.documentClient = getContextClient(cosmosContainer);
this.scheduler = Schedulers.boundedElastic();
this(cosmosContainer, Schedulers.boundedElastic());
}

/**
Expand All @@ -62,9 +63,7 @@ public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer) {
* @param scheduler the RX Java scheduler to observe on.
*/
public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer, Scheduler scheduler) {
if (cosmosContainer == null) {
throw new IllegalArgumentException("cosmosContainer");
}
checkNotNull(cosmosContainer, "Argument 'cosmosContainer' can not be null");

this.cosmosContainer = cosmosContainer;
this.documentClient = getContextClient(cosmosContainer);
Expand All @@ -82,6 +81,33 @@ public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public Mono<List<PartitionKeyRange>> getOverlappingRanges(Range<String> range) {
AsyncDocumentClient clientWrapper =
CosmosBridgeInternal.getAsyncDocumentClient(this.cosmosContainer.getDatabase());

return clientWrapper
.getCollectionCache()
.resolveByNameAsync(null, BridgeInternal.extractContainerSelfLink(this.cosmosContainer), null)
.flatMap(collection -> {
return clientWrapper.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(
null,
collection.getResourceId(),
range,
true,
null);
})
.flatMap(pkRangesValueHolder -> {
if (pkRangesValueHolder == null || pkRangesValueHolder.v == null) {
logger.warn("There are no overlapping ranges found for range {}", range);
return Mono.just(new ArrayList<PartitionKeyRange>());
}

return Mono.just(pkRangesValueHolder.v);
})
.publishOn(this.scheduler);
}

@Override
public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRangeFeed(String partitionKeyRangesOrCollectionLink, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
return this.documentClient.readPartitionKeyRanges(partitionKeyRangesOrCollectionLink, cosmosQueryRequestOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* Implement static methods used for various simple transformations and tasks.
*/
public class ChangeFeedHelper {
private static final String DEFAULT_USER_AGENT_SUFFIX = "changefeed-2.2.6";

public static final int HTTP_STATUS_CODE_NOT_FOUND = 404;
public static final int HTTP_STATUS_CODE_CONFLICT = 409;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public class ExceptionClassifier {
// 410: partition key range is gone.
public static final int SubStatusCode_PartitionKeyRangeGone = 1002;

// 410: partition splitting.
public static final int SubStatusCode_Splitting = 1007;
// 410: split and merge use the same exception status code and substatus code.
public static final int SubStatusCode_Splitting_Or_Merging = 1007;

// 404: LSN in session token is higher.
public static final int SubStatusCode_ReadSessionNotAvailable = 1002;
Expand All @@ -27,8 +27,9 @@ public static StatusCodeErrorType classifyClientException(CosmosException client
return StatusCodeErrorType.PARTITION_NOT_FOUND;
}

if (clientException.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_GONE && (subStatusCode == SubStatusCode_PartitionKeyRangeGone || subStatusCode == SubStatusCode_Splitting)) {
return StatusCodeErrorType.PARTITION_SPLIT;
if (clientException.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_GONE
&& (subStatusCode == SubStatusCode_PartitionKeyRangeGone || subStatusCode == SubStatusCode_Splitting_Or_Merging)) {
return StatusCodeErrorType.PARTITION_SPLIT_OR_MERGE;
}

if (clientException.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_TOO_MANY_REQUESTS || clientException.getStatusCode() >= ChangeFeedHelper.HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public enum StatusCodeErrorType {
UNDEFINED,
PARTITION_NOT_FOUND,
PARTITION_SPLIT,
PARTITION_SPLIT_OR_MERGE,
TRANSIENT_ERROR,
MAX_ITEM_COUNT_TOO_LARGE
}
Loading

0 comments on commit 6c7f6f6

Please sign in to comment.