Skip to content

Commit

Permalink
AllowChangeFeedQueryToCompleteAfterFetchedChangesAvailableNow (Azure#…
Browse files Browse the repository at this point in the history
…42160)

* allow change feed query to complete once fetched all changes available now

---------

Co-authored-by: annie-mac <[email protected]>
  • Loading branch information
xinlian12 and annie-mac authored Oct 5, 2024
1 parent cb5947e commit 7d04bb8
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.azure.cosmos.models.ChangeFeedPolicy;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.FeedRange;
Expand All @@ -37,6 +38,7 @@
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.testng.annotations.AfterClass;
Expand All @@ -63,6 +65,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -83,6 +86,17 @@ public class CosmosContainerChangeFeedTest extends TestSuiteBase {
private final Multimap<String, ObjectNode> partitionKeyToDocuments = ArrayListMultimap.create();
private final String preExistingDatabaseId = CosmosDatabaseForTest.generateId();

@DataProvider(name = "changeFeedQueryCompleteAfterAvailableNowDataProvider")
public static Object[][] changeFeedQueryCompleteAfterAvailableNowDataProvider() {
return new Object[][]{
// container RU, continuous ingest items
{ 400, true },
{ 400, false },
{ 11000, true },
{ 11000, false },
};
}

@DataProvider(name = "changeFeedSplitHandlingDataProvider")
public static Object[][] changeFeedSplitHandlingDataProvider() {
return new Object[][]{
Expand Down Expand Up @@ -829,10 +843,64 @@ public void split_only_notModified() throws Exception {
assertThat(stateAfterLastDrainAttempt.getContinuation().getCompositeContinuationTokens()).hasSize(3);
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryCompleteAfterAvailableNowDataProvider", timeOut = 100 * TIMEOUT)
public void changeFeedQueryCompleteAfterAvailableNow(
int throughput,
boolean shouldContinuouslyIngestItems) {
String testContainerId = UUID.randomUUID().toString();

try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
throughput);

List<FeedRange> feedRanges = testContainer.getFeedRanges().block();
AtomicInteger currentPageCount = new AtomicInteger(0);

insertDocuments(1, 5, testContainer);
CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());

cosmosChangeFeedRequestOptions.setCompleteAfterAllCurrentChangesRetrieved(true);
AtomicInteger totalQueryCount = new AtomicInteger(0);
testContainer.queryChangeFeed(cosmosChangeFeedRequestOptions, JsonNode.class)
.byPage(1)
.flatMap(response -> {
int currentPage = currentPageCount.incrementAndGet();
totalQueryCount.set(totalQueryCount.get() + response.getResults().size());

// Only start creating new items once we have looped through all feedRanges once to make the test behavior more deterministic
if (shouldContinuouslyIngestItems && currentPage >= feedRanges.size()) {
return testContainer
.createItem(getDocumentDefinition(UUID.randomUUID().toString())).then();
} else {
return Mono.empty();
}
})
.blockLast();

assertThat(totalQueryCount.get()).isEqualTo(5);
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

void insertDocuments(
int partitionCount,
int documentCount) {

insertDocuments(partitionCount, documentCount, this.createdAsyncContainer);
}

void insertDocuments(
int partitionCount,
int documentCount,
CosmosAsyncContainer container) {

List<ObjectNode> docs = new ArrayList<>();

for (int i = 0; i < partitionCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setUp() {
this.consistencyLevelMock = Mockito.mock(ConsistencyLevel.class);
this.configsMock = Mockito.mock(Configs.class);
this.cosmosAuthorizationTokenResolverMock = Mockito.mock(CosmosAuthorizationTokenResolver.class);
this.azureKeyCredentialMock = Mockito.mock(AzureKeyCredential.class);
this.azureKeyCredentialMock = new AzureKeyCredential("fakeKey");
this.metadataCachesSnapshotMock = Mockito.mock(CosmosClientMetadataCachesSnapshot.class);
this.apiTypeMock = Mockito.mock(ApiType.class);
this.cosmosClientTelemetryConfigMock = Mockito.mock(CosmosClientTelemetryConfig.class);
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added an API to retrieve diagnostics from the change feed processor context. - See [PR 41738](https://github.com/Azure/azure-sdk-for-java/pull/41738)
* Added support to allow `queryChangeFeed` to complete when all changes available when the query starts have been fetched. - See [PR 42160](https://github.com/Azure/azure-sdk-for-java/pull/42160)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public Flux<FeedResponse<T>> executeAsync() {
this.options.getMaxItemCount(),
this.options.getMaxPrefetchPageCount(),
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options),
this.options.isCompleteAfterAllCurrentChangesRetrieved(),
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequestOptions {
private static final int DEFAULT_MAX_ITEM_COUNT = 100;
private static final int DEFAULT_MAX_PREFETCH_PAGE_COUNT = 1;
private static final boolean DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED = false;
private final ChangeFeedState continuationState;
private final FeedRangeInternal feedRangeInternal;
private final Map<String, Object> properties;
Expand All @@ -47,6 +48,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private PartitionKeyDefinition partitionKeyDefinition;
private String collectionRid;
private Set<String> keywordIdentifiers;
private boolean completeAfterAllCurrentChangesRetrieved;

public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
this.continuationState = toBeCloned.continuationState;
Expand All @@ -67,6 +69,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.collectionRid = toBeCloned.collectionRid;
this.partitionKeyDefinition = toBeCloned.partitionKeyDefinition;
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
}

public CosmosChangeFeedRequestOptionsImpl(
Expand Down Expand Up @@ -104,6 +107,7 @@ public CosmosChangeFeedRequestOptionsImpl(

this.properties = new HashMap<>();
this.isSplitHandlingDisabled = false;
this.completeAfterAllCurrentChangesRetrieved = DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED;
}

public ChangeFeedState getContinuation() {
Expand Down Expand Up @@ -359,6 +363,14 @@ public Set<String> getKeywordIdentifiers() {
return this.keywordIdentifiers;
}

public boolean isCompleteAfterAllCurrentChangesRetrieved() {
return this.completeAfterAllCurrentChangesRetrieved;
}

public void setCompleteAfterAllCurrentChangesRetrieved(boolean queryAvailableNow) {
this.completeAfterAllCurrentChangesRetrieved = queryAvailableNow;
}

@Override
public void override(CosmosRequestOptions cosmosRequestOptions) {
this.maxItemCount = overrideOption(cosmosRequestOptions.getMaxItemCount(), this.maxItemCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
Expand All @@ -28,8 +29,10 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
Expand All @@ -41,10 +44,15 @@
final class FeedRangeCompositeContinuationImpl extends FeedRangeContinuation {

private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeCompositeContinuationImpl.class);
private static final String PK_RANGE_ID_SEPARATOR = ":";
private static final String SEGMENT_SEPARATOR = "#";

private final Queue<CompositeContinuationToken> compositeContinuationTokens;
private CompositeContinuationToken currentToken;
private String initialNoResultsRange;
private final AtomicLong continuousNotModifiedSinceInitialNoResultsRangeCaptured = new AtomicLong(0);
private final Map<Range<String>, FeedRangeLSNContext> feedRangeLSNContextMap = new ConcurrentHashMap<>();


public FeedRangeCompositeContinuationImpl(
String containerRid,
Expand Down Expand Up @@ -260,6 +268,25 @@ public <T> ShouldRetryResult handleChangeFeedNotModified(final FeedResponse<T> r
return ShouldRetryResult.NO_RETRY;
}

@Override
public <T> boolean hasFetchedAllChangesAvailableNow(FeedResponse<T> response) {
FeedRangeLSNContext feedRangeLSNContext =
this.updateFeedRangeEndLSNIfAbsent(
this.currentToken.getRange(),
response.getSessionToken());
feedRangeLSNContext.handleLSNFromContinuation(this.currentToken);

// find next token which can fetch more
Range<String> initialToken = this.currentToken.getRange();
do {
this.moveToNextToken();
} while (
!this.currentToken.getRange().equals(initialToken) &&
this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange()));

return this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange());
}

@Override
public Mono<ShouldRetryResult> handleFeedRangeGone(final RxDocumentClientImpl client,
final GoneException goneException) {
Expand Down Expand Up @@ -298,6 +325,37 @@ public Mono<ShouldRetryResult> handleFeedRangeGone(final RxDocumentClientImpl cl
});
}

private Long getLatestLsnFromSessionToken(String sessionToken) {
String parsedSessionToken = sessionToken.substring(
sessionToken.indexOf(PK_RANGE_ID_SEPARATOR));
String[] segments = StringUtils.split(parsedSessionToken, SEGMENT_SEPARATOR);
String latestLsn = segments[0];
if (segments.length >= 2) {
// default to Global LSN
latestLsn = segments[1];
}

return Long.parseLong(latestLsn);
}

private FeedRangeLSNContext updateFeedRangeEndLSNIfAbsent(
Range<String> targetedRange,
String sessionToken) {
return this.feedRangeLSNContextMap.computeIfAbsent(
targetedRange,
(range) -> {
return new FeedRangeLSNContext(
targetedRange,
this.getLatestLsnFromSessionToken(sessionToken)
);
});
}

private boolean hasFetchAllChangesAvailableNowForFeedRange(Range<String> range) {
return this.feedRangeLSNContextMap.containsKey(range) &&
this.feedRangeLSNContextMap.get(range).hasCompleted;
}

/**
* Used for deserializtion only
*/
Expand Down Expand Up @@ -524,4 +582,32 @@ public int compare(PartitionKeyRange o1, PartitionKeyRange o2) {
return o1.getMinInclusive().compareTo(o2.getMinInclusive());
}
}

final static class FeedRangeLSNContext {
private Range<String> range;
private Long endLSN;
private boolean hasCompleted;

public FeedRangeLSNContext(Range<String> range, Long endLSN) {
this.range = range;
this.endLSN = endLSN;
this.hasCompleted = false;
}

public void handleLSNFromContinuation(CompositeContinuationToken compositeContinuationToken) {
if (!compositeContinuationToken.getRange().equals(this.range)) {
throw new IllegalStateException(
"Range in FeedRangeAvailableNowContext is different than the range in the continuationToken");
}

String lsnFromContinuationToken = compositeContinuationToken.getToken();
if (lsnFromContinuationToken.startsWith("\"")) {
lsnFromContinuationToken = lsnFromContinuationToken.substring(1, lsnFromContinuationToken.length() - 1);
}

if (Long.parseLong(lsnFromContinuationToken) >= endLSN) {
this.hasCompleted = true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public static FeedRangeContinuation create(
public abstract <T> ShouldRetryResult handleChangeFeedNotModified(
FeedResponse<T> responseMessage);

public abstract <T> boolean hasFetchedAllChangesAvailableNow(
FeedResponse<T> responseMessage);

public abstract Mono<ShouldRetryResult> handleFeedRangeGone(
RxDocumentClientImpl client,
GoneException goneException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ChangeFeedFetcher<T> extends Fetcher<T> {
private final ChangeFeedState changeFeedState;
private final Supplier<RxDocumentServiceRequest> createRequestFunc;
private final Supplier<DocumentClientRetryPolicy> feedRangeContinuationRetryPolicySupplier;
private final boolean completeAfterAllCurrentChangesRetrieved;

public ChangeFeedFetcher(
RxDocumentClientImpl client,
Expand All @@ -52,6 +53,7 @@ public ChangeFeedFetcher(
int top,
int maxItemCount,
boolean isSplitHandlingDisabled,
boolean completeAfterAllCurrentChangesRetrieved,
OperationContextAndListenerTuple operationContext,
GlobalEndpointManager globalEndpointManager,
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManagerForCircuitBreaker) {
Expand All @@ -76,6 +78,7 @@ public ChangeFeedFetcher(
collectionLink,
isSplitHandlingDisabled);
this.createRequestFunc = createRequestFunc;
this.completeAfterAllCurrentChangesRetrieved = completeAfterAllCurrentChangesRetrieved;
}

@Override
Expand Down Expand Up @@ -112,13 +115,32 @@ private Mono<FeedResponse<T>> nextPageInternal(DocumentClientRetryPolicy retryPo
FeedRangeContinuation continuationSnapshot =
this.changeFeedState.getContinuation();

if (continuationSnapshot != null &&
continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {

// not all continuations have been drained yet
// repeat with the next continuation
this.reEnableShouldFetchMoreForRetry();
return Mono.empty();
if (this.completeAfterAllCurrentChangesRetrieved) {
if (continuationSnapshot != null) {
//track the end-LSN available now for each sub-feedRange and then find the next sub-feedRange to fetch more changes
boolean shouldComplete = continuationSnapshot.hasFetchedAllChangesAvailableNow(r);
if (shouldComplete) {
this.disableShouldFetchMore();
return Mono.just(r);
}

if (ModelBridgeInternal.<T>noChanges(r)) {
// if we have reached here, it means we have got 304 for the current feedRange,
// but we need to continue drain the changes from other sub-feedRange
this.reEnableShouldFetchMoreForRetry();
return Mono.empty();
}
}
} else {
// complete query based on 304s
if (continuationSnapshot != null &&
continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {

// not all continuations have been drained yet
// repeat with the next continuation
this.reEnableShouldFetchMoreForRetry();
return Mono.empty();
}
}

return Mono.just(r);
Expand All @@ -133,7 +155,7 @@ protected String applyServerResponseContinuation(
FeedResponse<T> response) {

boolean isNoChanges = feedResponseAccessor.getNoChanges(response);
boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges;
boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges && !this.completeAfterAllCurrentChangesRetrieved;
return this.changeFeedState.applyServerResponseContinuation(
serverContinuationToken, request, shouldMoveToNextTokenOnETagReplace);
}
Expand Down
Loading

0 comments on commit 7d04bb8

Please sign in to comment.