Skip to content

Commit

Permalink
Client Encryption : Adds the implementation for new ChangeFeed APIs (#…
Browse files Browse the repository at this point in the history
…2452)

* Implement the new ChangeFeed APIs for Encryption

* Address comments.
  • Loading branch information
anujtoshniwal authored May 12, 2021
1 parent f3692d8 commit 7231619
Show file tree
Hide file tree
Showing 8 changed files with 867 additions and 259 deletions.
165 changes: 117 additions & 48 deletions Microsoft.Azure.Cosmos.Encryption/src/Custom/EncryptionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -774,50 +774,6 @@ public override FeedIterator GetItemQueryStreamIterator(
this.CosmosSerializer);
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangesHandler<T> onChangesDelegate)
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(null);
using (diagnosticsContext.CreateScope("GetChangeFeedProcessorBuilder"))
{
return this.container.GetChangeFeedProcessorBuilder(
processorName,
async (IReadOnlyCollection<JObject> documents, CancellationToken cancellationToken) =>
{
List<T> decryptItems = new List<T>(documents.Count);
if (typeof(T) == typeof(DecryptableItem))
{
foreach (JToken value in documents)
{
DecryptableItemCore item = new DecryptableItemCore(
value,
this.Encryptor,
this.CosmosSerializer);
decryptItems.Add((T)(object)item);
}
}
else
{
foreach (JObject document in documents)
{
(JObject decryptedDocument, DecryptionContext _) = await EncryptionProcessor.DecryptAsync(
document,
this.Encryptor,
diagnosticsContext,
cancellationToken);
decryptItems.Add(decryptedDocument.ToObject<T>());
}
}
// Call the original passed in delegate
await onChangesDelegate(decryptItems, cancellationToken);
});
}
}

public override Task<ThroughputResponse> ReplaceThroughputAsync(
ThroughputProperties throughputProperties,
RequestOptions requestOptions = null,
Expand Down Expand Up @@ -927,32 +883,107 @@ public override Task<ResponseMessage> PatchItemStreamAsync(
throw new NotImplementedException();
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangesHandler<T> onChangesDelegate)
{
return this.container.GetChangeFeedProcessorBuilder(
processorName,
async (
IReadOnlyCollection<JObject> documents,
CancellationToken cancellationToken) =>
{
List<T> decryptItems = await this.DecryptChangeFeedDocumentsAsync<T>(
documents,
cancellationToken);
// Call the original passed in delegate
await onChangesDelegate(decryptItems, cancellationToken);
});
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
string processorName,
ChangeFeedHandler<T> onChangesDelegate)
{
throw new NotImplementedException();
return this.container.GetChangeFeedProcessorBuilder(
processorName,
async (
ChangeFeedProcessorContext context,
IReadOnlyCollection<JObject> documents,
CancellationToken cancellationToken) =>
{
List<T> decryptItems = await this.DecryptChangeFeedDocumentsAsync<T>(
documents,
cancellationToken);
// Call the original passed in delegate
await onChangesDelegate(context, decryptItems, cancellationToken);
});
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
string processorName,
ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate)
{
throw new NotImplementedException();
return this.container.GetChangeFeedProcessorBuilderWithManualCheckpoint(
processorName,
async (
ChangeFeedProcessorContext context,
IReadOnlyCollection<JObject> documents,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
CancellationToken cancellationToken) =>
{
List<T> decryptItems = await this.DecryptChangeFeedDocumentsAsync<T>(
documents,
cancellationToken);
// Call the original passed in delegate
await onChangesDelegate(context, decryptItems, tryCheckpointAsync, cancellationToken);
});
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(
string processorName,
ChangeFeedStreamHandler onChangesDelegate)
{
throw new NotImplementedException();
return this.container.GetChangeFeedProcessorBuilder(
processorName,
async (
ChangeFeedProcessorContext context,
Stream changes,
CancellationToken cancellationToken) =>
{
Stream decryptedChanges = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
changes,
this.Encryptor,
cancellationToken);
// Call the original passed in delegate
await onChangesDelegate(context, decryptedChanges, cancellationToken);
});
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(
string processorName,
ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate)
{
throw new NotImplementedException();
return this.container.GetChangeFeedProcessorBuilderWithManualCheckpoint(
processorName,
async (
ChangeFeedProcessorContext context,
Stream changes,
Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
CancellationToken cancellationToken) =>
{
Stream decryptedChanges = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
changes,
this.Encryptor,
cancellationToken);
// Call the original passed in delegate
await onChangesDelegate(context, decryptedChanges, tryCheckpointAsync, cancellationToken);
});
}

public override Task<ResponseMessage> ReadManyItemsStreamAsync(
Expand All @@ -970,5 +1001,43 @@ public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
{
throw new NotImplementedException();
}

private async Task<List<T>> DecryptChangeFeedDocumentsAsync<T>(
IReadOnlyCollection<JObject> documents,
CancellationToken cancellationToken)
{
List<T> decryptItems = new List<T>(documents.Count);
if (typeof(T) == typeof(DecryptableItem))
{
foreach (JToken value in documents)
{
DecryptableItemCore item = new DecryptableItemCore(
value,
this.Encryptor,
this.CosmosSerializer);

decryptItems.Add((T)(object)item);
}
}
else
{
foreach (JObject document in documents)
{
CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(null);
using (diagnosticsContext.CreateScope("DecryptChangeFeedDocumentsAsync<"))
{
(JObject decryptedDocument, DecryptionContext _) = await EncryptionProcessor.DecryptAsync(
document,
this.Encryptor,
diagnosticsContext,
cancellationToken);

decryptItems.Add(decryptedDocument.ToObject<T>());
}
}
}

return decryptItems;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc

if (responseMessage.IsSuccessStatusCode && responseMessage.Content != null)
{
Stream decryptedContent = await this.DeserializeAndDecryptResponseAsync(
Stream decryptedContent = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
responseMessage.Content,
diagnosticsContext,
this.encryptor,
cancellationToken);

return new DecryptedResponseMessage(responseMessage, decryptedContent);
Expand Down Expand Up @@ -94,51 +94,5 @@ private List<T> ConvertResponseToDecryptableItems<T>(

return decryptableItems;
}

private async Task<Stream> DeserializeAndDecryptResponseAsync(
Stream content,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray result = new JArray();

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents");
}

foreach (JToken value in documents)
{
if (!(value is JObject document))
{
result.Add(value);
continue;
}

(JObject decryptedDocument, DecryptionContext _) = await EncryptionProcessor.DecryptAsync(
document,
this.encryptor,
diagnosticsContext,
cancellationToken);

result.Add(decryptedDocument);
}

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)result);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,5 +565,55 @@ private enum TypeMarker : byte
Array = 6,
Object = 7,
}

internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
Stream content,
Encryptor encryptor,
CancellationToken cancellationToken)
{
JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray result = new JArray();

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents");
}

foreach (JToken value in documents)
{
if (!(value is JObject document))
{
result.Add(value);
continue;
}

CosmosDiagnosticsContext diagnosticsContext = CosmosDiagnosticsContext.Create(null);
using (diagnosticsContext.CreateScope("EncryptionProcessor.DeserializeAndDecryptResponseAsync"))
{
(JObject decryptedDocument, DecryptionContext _) = await EncryptionProcessor.DecryptAsync(
document,
encryptor,
diagnosticsContext,
cancellationToken);

result.Add(decryptedDocument);
}
}

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)result);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
}
}
}
Loading

0 comments on commit 7231619

Please sign in to comment.