From 7d04bb8bf1cf5345521c6085ef39ac2084b40559 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Fri, 4 Oct 2024 21:29:00 -0700 Subject: [PATCH] AllowChangeFeedQueryToCompleteAfterFetchedChangesAvailableNow (#42160) * allow change feed query to complete once fetched all changes available now --------- Co-authored-by: annie-mac --- .../cosmos/CosmosContainerChangeFeedTest.java | 68 +++++++++++++++ .../RxDocumentClientImplTest.java | 2 +- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../implementation/ChangeFeedQueryImpl.java | 1 + .../CosmosChangeFeedRequestOptionsImpl.java | 12 +++ .../FeedRangeCompositeContinuationImpl.java | 86 +++++++++++++++++++ .../feedranges/FeedRangeContinuation.java | 3 + .../query/ChangeFeedFetcher.java | 38 ++++++-- .../cosmos/implementation/query/Fetcher.java | 4 + .../implementation/query/Paginator.java | 2 + .../CosmosChangeFeedRequestOptions.java | 21 +++++ 11 files changed, 229 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 9b902f0346859..aebc1c8db9459 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -20,6 +20,7 @@ import com.azure.cosmos.models.ChangeFeedPolicy; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.FeedRange; @@ -37,6 +38,7 @@ import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.testng.annotations.AfterClass; @@ -63,6 +65,7 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -83,6 +86,17 @@ public class CosmosContainerChangeFeedTest extends TestSuiteBase { private final Multimap partitionKeyToDocuments = ArrayListMultimap.create(); private final String preExistingDatabaseId = CosmosDatabaseForTest.generateId(); + @DataProvider(name = "changeFeedQueryCompleteAfterAvailableNowDataProvider") + public static Object[][] changeFeedQueryCompleteAfterAvailableNowDataProvider() { + return new Object[][]{ + // container RU, continuous ingest items + { 400, true }, + { 400, false }, + { 11000, true }, + { 11000, false }, + }; + } + @DataProvider(name = "changeFeedSplitHandlingDataProvider") public static Object[][] changeFeedSplitHandlingDataProvider() { return new Object[][]{ @@ -829,10 +843,64 @@ public void split_only_notModified() throws Exception { assertThat(stateAfterLastDrainAttempt.getContinuation().getCompositeContinuationTokens()).hasSize(3); } + @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryCompleteAfterAvailableNowDataProvider", timeOut = 100 * TIMEOUT) + public void changeFeedQueryCompleteAfterAvailableNow( + int throughput, + boolean shouldContinuouslyIngestItems) { + String testContainerId = UUID.randomUUID().toString(); + + try { + CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk"); + CosmosAsyncContainer testContainer = + createCollection( + this.createdAsyncDatabase, + containerProperties, + new CosmosContainerRequestOptions(), + throughput); + + List feedRanges = testContainer.getFeedRanges().block(); + AtomicInteger currentPageCount = new AtomicInteger(0); + + insertDocuments(1, 5, testContainer); + CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions = + CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()); + + cosmosChangeFeedRequestOptions.setCompleteAfterAllCurrentChangesRetrieved(true); + AtomicInteger totalQueryCount = new AtomicInteger(0); + testContainer.queryChangeFeed(cosmosChangeFeedRequestOptions, JsonNode.class) + .byPage(1) + .flatMap(response -> { + int currentPage = currentPageCount.incrementAndGet(); + totalQueryCount.set(totalQueryCount.get() + response.getResults().size()); + + // Only start creating new items once we have looped through all feedRanges once to make the test behavior more deterministic + if (shouldContinuouslyIngestItems && currentPage >= feedRanges.size()) { + return testContainer + .createItem(getDocumentDefinition(UUID.randomUUID().toString())).then(); + } else { + return Mono.empty(); + } + }) + .blockLast(); + + assertThat(totalQueryCount.get()).isEqualTo(5); + } finally { + safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId)); + } + } + void insertDocuments( int partitionCount, int documentCount) { + insertDocuments(partitionCount, documentCount, this.createdAsyncContainer); + } + + void insertDocuments( + int partitionCount, + int documentCount, + CosmosAsyncContainer container) { + List docs = new ArrayList<>(); for (int i = 0; i < partitionCount; i++) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java index 0bafee942c5ff..7ea3c5b87322f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java @@ -96,7 +96,7 @@ public void setUp() { this.consistencyLevelMock = Mockito.mock(ConsistencyLevel.class); this.configsMock = Mockito.mock(Configs.class); this.cosmosAuthorizationTokenResolverMock = Mockito.mock(CosmosAuthorizationTokenResolver.class); - this.azureKeyCredentialMock = Mockito.mock(AzureKeyCredential.class); + this.azureKeyCredentialMock = new AzureKeyCredential("fakeKey"); this.metadataCachesSnapshotMock = Mockito.mock(CosmosClientMetadataCachesSnapshot.class); this.apiTypeMock = Mockito.mock(ApiType.class); this.cosmosClientTelemetryConfigMock = Mockito.mock(CosmosClientTelemetryConfig.class); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 3948323964bac..dbf1c82c8496b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,6 +4,7 @@ #### Features Added * Added an API to retrieve diagnostics from the change feed processor context. - See [PR 41738](https://github.com/Azure/azure-sdk-for-java/pull/41738) +* Added support to allow `queryChangeFeed` to complete when all changes available when the query starts have been fetched. - See [PR 42160](https://github.com/Azure/azure-sdk-for-java/pull/42160) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index af5a391ba72d0..1ee45b8830fd1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -113,6 +113,7 @@ public Flux> executeAsync() { this.options.getMaxItemCount(), this.options.getMaxPrefetchPageCount(), ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options), + this.options.isCompleteAfterAllCurrentChangesRetrieved(), ImplementationBridgeHelpers .CosmosChangeFeedRequestOptionsHelper .getCosmosChangeFeedRequestOptionsAccessor() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 34eccc9231c2c..c887021e557de 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -29,6 +29,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequestOptions { private static final int DEFAULT_MAX_ITEM_COUNT = 100; private static final int DEFAULT_MAX_PREFETCH_PAGE_COUNT = 1; + private static final boolean DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED = false; private final ChangeFeedState continuationState; private final FeedRangeInternal feedRangeInternal; private final Map properties; @@ -47,6 +48,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private PartitionKeyDefinition partitionKeyDefinition; private String collectionRid; private Set keywordIdentifiers; + private boolean completeAfterAllCurrentChangesRetrieved; public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { this.continuationState = toBeCloned.continuationState; @@ -67,6 +69,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.collectionRid = toBeCloned.collectionRid; this.partitionKeyDefinition = toBeCloned.partitionKeyDefinition; this.keywordIdentifiers = toBeCloned.keywordIdentifiers; + this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; } public CosmosChangeFeedRequestOptionsImpl( @@ -104,6 +107,7 @@ public CosmosChangeFeedRequestOptionsImpl( this.properties = new HashMap<>(); this.isSplitHandlingDisabled = false; + this.completeAfterAllCurrentChangesRetrieved = DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED; } public ChangeFeedState getContinuation() { @@ -359,6 +363,14 @@ public Set getKeywordIdentifiers() { return this.keywordIdentifiers; } + public boolean isCompleteAfterAllCurrentChangesRetrieved() { + return this.completeAfterAllCurrentChangesRetrieved; + } + + public void setCompleteAfterAllCurrentChangesRetrieved(boolean queryAvailableNow) { + this.completeAfterAllCurrentChangesRetrieved = queryAvailableNow; + } + @Override public void override(CosmosRequestOptions cosmosRequestOptions) { this.maxItemCount = overrideOption(cosmosRequestOptions.getMaxItemCount(), this.maxItemCount); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java index 3b8e4fc579e24..a0a65f5d74d32 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java @@ -12,6 +12,7 @@ import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.query.CompositeContinuationToken; import com.azure.cosmos.implementation.routing.Range; @@ -28,8 +29,10 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -41,10 +44,15 @@ final class FeedRangeCompositeContinuationImpl extends FeedRangeContinuation { private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeCompositeContinuationImpl.class); + private static final String PK_RANGE_ID_SEPARATOR = ":"; + private static final String SEGMENT_SEPARATOR = "#"; + private final Queue compositeContinuationTokens; private CompositeContinuationToken currentToken; private String initialNoResultsRange; private final AtomicLong continuousNotModifiedSinceInitialNoResultsRangeCaptured = new AtomicLong(0); + private final Map, FeedRangeLSNContext> feedRangeLSNContextMap = new ConcurrentHashMap<>(); + public FeedRangeCompositeContinuationImpl( String containerRid, @@ -260,6 +268,25 @@ public ShouldRetryResult handleChangeFeedNotModified(final FeedResponse r return ShouldRetryResult.NO_RETRY; } + @Override + public boolean hasFetchedAllChangesAvailableNow(FeedResponse response) { + FeedRangeLSNContext feedRangeLSNContext = + this.updateFeedRangeEndLSNIfAbsent( + this.currentToken.getRange(), + response.getSessionToken()); + feedRangeLSNContext.handleLSNFromContinuation(this.currentToken); + + // find next token which can fetch more + Range initialToken = this.currentToken.getRange(); + do { + this.moveToNextToken(); + } while ( + !this.currentToken.getRange().equals(initialToken) && + this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange())); + + return this.hasFetchAllChangesAvailableNowForFeedRange(this.currentToken.getRange()); + } + @Override public Mono handleFeedRangeGone(final RxDocumentClientImpl client, final GoneException goneException) { @@ -298,6 +325,37 @@ public Mono handleFeedRangeGone(final RxDocumentClientImpl cl }); } + private Long getLatestLsnFromSessionToken(String sessionToken) { + String parsedSessionToken = sessionToken.substring( + sessionToken.indexOf(PK_RANGE_ID_SEPARATOR)); + String[] segments = StringUtils.split(parsedSessionToken, SEGMENT_SEPARATOR); + String latestLsn = segments[0]; + if (segments.length >= 2) { + // default to Global LSN + latestLsn = segments[1]; + } + + return Long.parseLong(latestLsn); + } + + private FeedRangeLSNContext updateFeedRangeEndLSNIfAbsent( + Range targetedRange, + String sessionToken) { + return this.feedRangeLSNContextMap.computeIfAbsent( + targetedRange, + (range) -> { + return new FeedRangeLSNContext( + targetedRange, + this.getLatestLsnFromSessionToken(sessionToken) + ); + }); + } + + private boolean hasFetchAllChangesAvailableNowForFeedRange(Range range) { + return this.feedRangeLSNContextMap.containsKey(range) && + this.feedRangeLSNContextMap.get(range).hasCompleted; + } + /** * Used for deserializtion only */ @@ -524,4 +582,32 @@ public int compare(PartitionKeyRange o1, PartitionKeyRange o2) { return o1.getMinInclusive().compareTo(o2.getMinInclusive()); } } + + final static class FeedRangeLSNContext { + private Range range; + private Long endLSN; + private boolean hasCompleted; + + public FeedRangeLSNContext(Range range, Long endLSN) { + this.range = range; + this.endLSN = endLSN; + this.hasCompleted = false; + } + + public void handleLSNFromContinuation(CompositeContinuationToken compositeContinuationToken) { + if (!compositeContinuationToken.getRange().equals(this.range)) { + throw new IllegalStateException( + "Range in FeedRangeAvailableNowContext is different than the range in the continuationToken"); + } + + String lsnFromContinuationToken = compositeContinuationToken.getToken(); + if (lsnFromContinuationToken.startsWith("\"")) { + lsnFromContinuationToken = lsnFromContinuationToken.substring(1, lsnFromContinuationToken.length() - 1); + } + + if (Long.parseLong(lsnFromContinuationToken) >= endLSN) { + this.hasCompleted = true; + } + } + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java index 459af958b5fdc..1d0d67e6dc4a3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java @@ -136,6 +136,9 @@ public static FeedRangeContinuation create( public abstract ShouldRetryResult handleChangeFeedNotModified( FeedResponse responseMessage); + public abstract boolean hasFetchedAllChangesAvailableNow( + FeedResponse responseMessage); + public abstract Mono handleFeedRangeGone( RxDocumentClientImpl client, GoneException goneException); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java index 2067165f1cffc..ff1b37acad6cc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java @@ -42,6 +42,7 @@ class ChangeFeedFetcher extends Fetcher { private final ChangeFeedState changeFeedState; private final Supplier createRequestFunc; private final Supplier feedRangeContinuationRetryPolicySupplier; + private final boolean completeAfterAllCurrentChangesRetrieved; public ChangeFeedFetcher( RxDocumentClientImpl client, @@ -52,6 +53,7 @@ public ChangeFeedFetcher( int top, int maxItemCount, boolean isSplitHandlingDisabled, + boolean completeAfterAllCurrentChangesRetrieved, OperationContextAndListenerTuple operationContext, GlobalEndpointManager globalEndpointManager, GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManagerForCircuitBreaker) { @@ -76,6 +78,7 @@ public ChangeFeedFetcher( collectionLink, isSplitHandlingDisabled); this.createRequestFunc = createRequestFunc; + this.completeAfterAllCurrentChangesRetrieved = completeAfterAllCurrentChangesRetrieved; } @Override @@ -112,13 +115,32 @@ private Mono> nextPageInternal(DocumentClientRetryPolicy retryPo FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation(); - if (continuationSnapshot != null && - continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) { - - // not all continuations have been drained yet - // repeat with the next continuation - this.reEnableShouldFetchMoreForRetry(); - return Mono.empty(); + if (this.completeAfterAllCurrentChangesRetrieved) { + if (continuationSnapshot != null) { + //track the end-LSN available now for each sub-feedRange and then find the next sub-feedRange to fetch more changes + boolean shouldComplete = continuationSnapshot.hasFetchedAllChangesAvailableNow(r); + if (shouldComplete) { + this.disableShouldFetchMore(); + return Mono.just(r); + } + + if (ModelBridgeInternal.noChanges(r)) { + // if we have reached here, it means we have got 304 for the current feedRange, + // but we need to continue drain the changes from other sub-feedRange + this.reEnableShouldFetchMoreForRetry(); + return Mono.empty(); + } + } + } else { + // complete query based on 304s + if (continuationSnapshot != null && + continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) { + + // not all continuations have been drained yet + // repeat with the next continuation + this.reEnableShouldFetchMoreForRetry(); + return Mono.empty(); + } } return Mono.just(r); @@ -133,7 +155,7 @@ protected String applyServerResponseContinuation( FeedResponse response) { boolean isNoChanges = feedResponseAccessor.getNoChanges(response); - boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges; + boolean shouldMoveToNextTokenOnETagReplace = !isNoChanges && !this.completeAfterAllCurrentChangesRetrieved; return this.changeFeedState.applyServerResponseContinuation( serverContinuationToken, request, shouldMoveToNextTokenOnETagReplace); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java index 7c41b7f39a39c..dad3ff190abba 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java @@ -148,6 +148,10 @@ protected void reEnableShouldFetchMoreForRetry() { this.shouldFetchMore.set(true); } + protected void disableShouldFetchMore() { + this.shouldFetchMore.set(false); + } + protected RxDocumentServiceRequest createRequest(DocumentClientRetryPolicy documentClientRetryPolicy) { if (!shouldFetchMore.get()) { // this should never happen diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index d8e220af41316..c2b300350ed56 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -97,6 +97,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( int maxPageSize, int preFetchCount, boolean isSplitHandlingDisabled, + boolean completeAfterAllCurrentChangesRetrieved, OperationContextAndListenerTuple operationContext) { return getPaginatedQueryResultAsObservable( @@ -109,6 +110,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( top, maxPageSize, isSplitHandlingDisabled, + completeAfterAllCurrentChangesRetrieved, operationContext, client.getGlobalEndpointManager(), client.getGlobalPartitionEndpointManagerForCircuitBreaker() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index 3772200f593d4..48f14c241d9b3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -184,6 +184,27 @@ public CosmosChangeFeedRequestOptions setCustomItemSerializer(CosmosItemSerializ return this; } + /*** + * Whether the query should be completed when all available changes when the query starts have been fetched. + * + * @param completeAfterAllCurrentChangesRetrieved flag to indicate whether to complete the query when all changes up to current moment have been fetched. + * @return the CosmosChangeFeedRequestOptions. + */ + public CosmosChangeFeedRequestOptions setCompleteAfterAllCurrentChangesRetrieved( + boolean completeAfterAllCurrentChangesRetrieved) { + this.actualRequestOptions.setCompleteAfterAllCurrentChangesRetrieved(completeAfterAllCurrentChangesRetrieved); + return this; + } + + /*** + * Whether the query should be completed when all available changes when the query starts have been fetched. + * + * @return true if complete the query when all changes up to the current moment have been fetched. + */ + public boolean isCompleteAfterAllCurrentChangesRetrieved() { + return this.actualRequestOptions.isCompleteAfterAllCurrentChangesRetrieved(); + } + boolean isSplitHandlingDisabled() { return this.actualRequestOptions.isSplitHandlingDisabled(); }