Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor support for user serializer #944

Merged
merged 8 commits into from
Nov 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Linq;
Expand All @@ -24,7 +25,6 @@ internal sealed class RemainingWorkEstimatorCore : RemainingWorkEstimator
private const char PKRangeIdSeparator = ':';
private const char SegmentSeparator = '#';
private const string LSNPropertyName = "_lsn";
private static readonly CosmosSerializer DefaultSerializer = new CosmosJsonDotNetSerializer();
private readonly Func<string, string, bool, FeedIterator> feedCreator;
private readonly DocumentServiceLeaseContainer leaseContainer;
private readonly int degreeOfParallelism;
Expand Down Expand Up @@ -174,7 +174,7 @@ private static Collection<JObject> GetItemsFromResponse(ResponseMessage response
return new Collection<JObject>();
}

return RemainingWorkEstimatorCore.DefaultSerializer.FromStream<CosmosFeedResponseUtil<JObject>>(response.Content).Data;
return CosmosContainerExtensions.DefaultJsonSerializer.FromStream<CosmosFeedResponseUtil<JObject>>(response.Content).Data;
}

private async Task<long> GetRemainingWorkAsync(DocumentServiceLease existingLease, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;

internal sealed class DocumentServiceLeaseContainerCosmos : DocumentServiceLeaseContainer
{
Expand All @@ -29,7 +30,7 @@ public override async Task<IReadOnlyList<DocumentServiceLease>> GetAllLeasesAsyn

public override async Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsync()
{
var ownedLeases = new List<DocumentServiceLease>();
List<DocumentServiceLease> ownedLeases = new List<DocumentServiceLease>();
foreach (DocumentServiceLease lease in await this.GetAllLeasesAsync().ConfigureAwait(false))
{
if (string.Compare(lease.Owner, this.options.HostName, StringComparison.OrdinalIgnoreCase) == 0)
Expand All @@ -46,15 +47,18 @@ private async Task<IReadOnlyList<DocumentServiceLeaseCore>> ListDocumentsAsync(s
if (string.IsNullOrEmpty(prefix))
throw new ArgumentException("Prefix must be non-empty string", nameof(prefix));

var query = this.container.GetItemQueryIterator<DocumentServiceLeaseCore>(
FeedIterator iterator = this.container.GetItemQueryStreamIterator(
"SELECT * FROM c WHERE STARTSWITH(c.id, '" + prefix + "')",
continuationToken: null,
requestOptions: queryRequestOptions);

var leases = new List<DocumentServiceLeaseCore>();
while (query.HasMoreResults)
List<DocumentServiceLeaseCore> leases = new List<DocumentServiceLeaseCore>();
while (iterator.HasMoreResults)
{
leases.AddRange(await query.ReadNextAsync().ConfigureAwait(false));
using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false))
{
leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromStream<CosmosFeedResponseUtil<DocumentServiceLeaseCore>>(responseMessage.Content).Data);
}
}

return leases;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public override async Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(stri
throw new ArgumentNullException(nameof(leaseToken));

string leaseDocId = this.GetDocumentId(leaseToken);
var documentServiceLease = new DocumentServiceLeaseCore
DocumentServiceLeaseCore documentServiceLease = new DocumentServiceLeaseCore
{
LeaseId = leaseDocId,
LeaseToken = leaseToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
Expand Down Expand Up @@ -40,18 +41,24 @@ public override async Task<bool> IsInitializedAsync()
public override async Task MarkInitializedAsync()
{
string markerDocId = this.GetStoreMarkerName();
var containerDocument = new { id = markerDocId };
dynamic containerDocument = new { id = markerDocId };

await this.container.CreateItemAsync<dynamic>(
item: containerDocument,
partitionKey: this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false);
using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(containerDocument))
{
using (ResponseMessage responseMessage = await this.container.CreateItemStreamAsync(
itemStream,
this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
}
}
}

public override async Task<bool> AcquireInitializationLockAsync(TimeSpan lockTime)
{
string lockId = this.GetStoreLockName();
var containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds };
var document = await this.container.TryCreateItemAsync<LockDocument>(
LockDocument containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds };
ItemResponse<LockDocument> document = await this.container.TryCreateItemAsync<LockDocument>(
this.requestOptionsFactory.GetPartitionKey(lockId),
containerDocument).ConfigureAwait(false);

Expand All @@ -67,17 +74,17 @@ public override async Task<bool> AcquireInitializationLockAsync(TimeSpan lockTim
public override async Task<bool> ReleaseInitializationLockAsync()
{
string lockId = this.GetStoreLockName();
var requestOptions = new ItemRequestOptions()
ItemRequestOptions requestOptions = new ItemRequestOptions()
{
IfMatchEtag = this.lockETag,
};

var document = await this.container.TryDeleteItemAsync<LockDocument>(
bool deleted = await this.container.TryDeleteItemAsync<LockDocument>(
this.requestOptionsFactory.GetPartitionKey(lockId),
lockId,
requestOptions).ConfigureAwait(false);

if (document != null)
if (deleted)
{
this.lockETag = null;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;

/// <summary>
Expand Down Expand Up @@ -51,9 +52,7 @@ public override async Task<DocumentServiceLease> UpdateLeaseAsync(

try
{
ItemResponse<DocumentServiceLeaseCore> response = await this.container.ReadItemAsync<DocumentServiceLeaseCore>(
itemId, partitionKey).ConfigureAwait(false);
DocumentServiceLeaseCore serverLease = response.Resource;
DocumentServiceLeaseCore serverLease = await this.container.TryGetItemAsync<DocumentServiceLeaseCore>(partitionKey, itemId);

DefaultTrace.TraceInformation(
"Lease with token {0} update failed because the lease with concurrency token '{1}' was updated by host '{2}' with concurrency token '{3}'. Will retry, {4} retry(s) left.",
Expand All @@ -77,17 +76,17 @@ public override async Task<DocumentServiceLease> UpdateLeaseAsync(

private async Task<DocumentServiceLeaseCore> TryReplaceLeaseAsync(
DocumentServiceLeaseCore lease,
Cosmos.PartitionKey? partitionKey,
PartitionKey partitionKey,
string itemId)
{
try
{
ItemRequestOptions itemRequestOptions = this.CreateIfMatchOptions(lease);
ItemResponse<DocumentServiceLeaseCore> response = await this.container.ReplaceItemAsync<DocumentServiceLeaseCore>(
id: itemId,
item: lease,
partitionKey: partitionKey,
requestOptions: itemRequestOptions).ConfigureAwait(false);
ItemResponse<DocumentServiceLeaseCore> response = await this.container.TryReplaceItemAsync<DocumentServiceLeaseCore>(
itemId,
lease,
partitionKey,
itemRequestOptions).ConfigureAwait(false);

return response.Resource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.Utils
{
using System.Globalization;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -13,49 +14,78 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Utils

internal static class CosmosContainerExtensions
{
public static readonly CosmosSerializer DefaultJsonSerializer = new CosmosJsonDotNetSerializer();

public static async Task<T> TryGetItemAsync<T>(
this Container container,
PartitionKey partitionKey,
string itemId)
{
return await container.ReadItemAsync<T>(
using (ResponseMessage responseMessage = await container.ReadItemStreamAsync(
itemId,
partitionKey)
.ConfigureAwait(false);
.ConfigureAwait(false))
{
responseMessage.EnsureSuccessStatusCode();
return CosmosContainerExtensions.DefaultJsonSerializer.FromStream<T>(responseMessage.Content);
}
}

public static async Task<ItemResponse<T>> TryCreateItemAsync<T>(
this Container container,
object partitionKey,
PartitionKey partitionKey,
T item)
{
var response = await container.CreateItemAsync<T>(item).ConfigureAwait(false);
if (response.StatusCode == HttpStatusCode.Conflict)
using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream<T>(item))
{
// Ignore-- document already exists.
return null;
using (ResponseMessage response = await container.CreateItemStreamAsync(itemStream, partitionKey).ConfigureAwait(false))
{
if (response.StatusCode == HttpStatusCode.Conflict)
{
// Ignore-- document already exists.
return null;
}

return new ItemResponse<T>(response.StatusCode, response.Headers, CosmosContainerExtensions.DefaultJsonSerializer.FromStream<T>(response.Content), response.Diagnostics);
}
}
}

return response;
public static async Task<ItemResponse<T>> TryReplaceItemAsync<T>(
this Container container,
string itemId,
T item,
PartitionKey partitionKey,
ItemRequestOptions itemRequestOptions)
{
using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream<T>(item))
{
using (ResponseMessage response = await container.ReplaceItemStreamAsync(itemStream, itemId, partitionKey, itemRequestOptions).ConfigureAwait(false))
{
response.EnsureSuccessStatusCode();
return new ItemResponse<T>(response.StatusCode, response.Headers, CosmosContainerExtensions.DefaultJsonSerializer.FromStream<T>(response.Content), response.Diagnostics);
}
}
}

public static async Task<T> TryDeleteItemAsync<T>(
public static async Task<bool> TryDeleteItemAsync<T>(
this Container container,
PartitionKey partitionKey,
string itemId,
ItemRequestOptions cosmosItemRequestOptions = null)
{
var response = await container.DeleteItemAsync<T>(itemId, partitionKey, cosmosItemRequestOptions).ConfigureAwait(false);

return response.Resource;
using (ResponseMessage response = await container.DeleteItemStreamAsync(itemId, partitionKey, cosmosItemRequestOptions).ConfigureAwait(false))
{
return response.IsSuccessStatusCode;
}
}

public static async Task<bool> ItemExistsAsync(
this Container container,
PartitionKey partitionKey,
string itemId)
{
var response = await container.ReadItemStreamAsync(
ResponseMessage response = await container.ReadItemStreamAsync(
itemId,
partitionKey)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
{
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -162,6 +163,68 @@ public async Task WritesTriggerDelegate_WithInMemoryContainerWithDynamic()
CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds);
}

[TestMethod]
public async Task DoesNotUseUserSerializer()
{
CosmosClient cosmosClient = TestCommon.CreateCosmosClient(builder => builder.WithCustomSerializer(new FailedUserSerializer()));

ManualResetEvent allDocsProcessed = new ManualResetEvent(false);

int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = cosmosClient.GetContainer(this.database.Id, this.Container.Id)
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) =>
{
processedDocCount += docs.Count();
foreach (TestClass doc in docs)
{
accumulator += doc.id.ToString() + ".";
}

if (processedDocCount == 10)
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithInstanceName("random")
.WithLeaseContainer(cosmosClient.GetContainer(this.database.Id, this.LeaseContainer.Id)).Build();

// Start the processor, insert 1 document to generate a checkpoint
await processor.StartAsync();
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
foreach (int id in Enumerable.Range(0, 10))
{
await this.Container.CreateItemAsync<TestClass>(new TestClass { id = id.ToString() });
}

bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
await processor.StopAsync();
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator);
}

private class FailedUserSerializer : CosmosSerializer
{
private readonly CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer();
public override T FromStream<T>(Stream stream)
{
// Only let changes serialization pass through
if (typeof(T) == typeof(CosmosFeedResponseUtil<TestClass>))
{
return this.cosmosSerializer.FromStream<T>(stream);
}

throw new System.NotImplementedException();
}

public override Stream ToStream<T>(T input)
{
throw new System.NotImplementedException();
}
}

public class TestClass
{
public string id { get; set; }
Expand Down
Loading