Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Encryption: Add client encryption / decryption for transactional batch requests #1331

Merged
merged 9 commits into from
Apr 6, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ internal virtual async Task ValidateOperationAsync(
Debug.Assert(BatchAsyncContainerExecutor.ValidateOperationEPK(operation, itemRequestOptions));
}

await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
await operation.EncryptAndMaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
Expand Down Expand Up @@ -251,7 +251,13 @@ private async Task<PartitionKeyRangeBatchExecutionResult> ExecuteAsync(

using (diagnosticsContext.CreateScope("BatchAsyncContainerExecutor.ToResponse"))
{
TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(responseMessage, serverRequest, this.cosmosClientContext.SerializerCore).ConfigureAwait(false);
TransactionalBatchResponse serverResponse = await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.cosmosClientContext.SerializerCore,
shouldPromoteOperationStatus: true,
shouldPerformDecryption: false,
cancellationToken).ConfigureAwait(false);

return new PartitionKeyRangeBatchExecutionResult(serverRequest.PartitionKeyRangeId, serverRequest.Operations, serverResponse);
}
Expand Down
27 changes: 18 additions & 9 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public override TransactionalBatch CreateItem<T>(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -64,7 +65,8 @@ public override TransactionalBatch CreateItemStream(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -82,7 +84,8 @@ public override TransactionalBatch ReadItem(
operationType: OperationType.Read,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -100,7 +103,8 @@ public override TransactionalBatch UpsertItem<T>(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resource: item,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -118,7 +122,8 @@ public override TransactionalBatch UpsertItemStream(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -143,7 +148,8 @@ public override TransactionalBatch ReplaceItem<T>(
operationIndex: this.operations.Count,
id: id,
resource: item,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -168,7 +174,8 @@ public override TransactionalBatch ReplaceItemStream(
operationIndex: this.operations.Count,
id: id,
resourceStream: streamPayload,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand All @@ -186,7 +193,8 @@ public override TransactionalBatch DeleteItem(
operationType: OperationType.Delete,
operationIndex: this.operations.Count,
id: id,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand Down Expand Up @@ -238,7 +246,8 @@ public virtual TransactionalBatch PatchItemStream(
operationIndex: this.operations.Count,
id: id,
resourceStream: patchStream,
requestOptions: requestOptions));
requestOptions: requestOptions,
containerCore: this.container));

return this;
}
Expand Down
10 changes: 9 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public async Task<TransactionalBatchResponse> ExecuteAsync(CancellationToken can
{
BatchExecUtils.EnsureValid(this.inputOperations, this.batchOptions);

foreach (ItemBatchOperation operation in this.inputOperations)
{
operation.DiagnosticsContext = this.diagnosticsContext;
}

PartitionKey? serverRequestPartitionKey = this.partitionKey;
if (this.batchOptions != null && this.batchOptions.IsEffectivePartitionKeyRouting)
{
Expand Down Expand Up @@ -103,7 +108,10 @@ private async Task<TransactionalBatchResponse> ExecuteServerRequestAsync(
return await TransactionalBatchResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore);
this.clientContext.SerializerCore,
shouldPromoteOperationStatus: true,
shouldPerformDecryption: true,
cancellationToken);
}
}
}
Expand Down
32 changes: 24 additions & 8 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public ItemBatchOperation(
public ItemBatchOperation(
OperationType operationType,
int operationIndex,
ContainerCore containerCore,
string id = null,
Stream resourceStream = null,
TransactionalBatchItemRequestOptions requestOptions = null)
{
this.OperationType = operationType;
this.OperationIndex = operationIndex;
this.ContainerCore = containerCore;
this.Id = id;
this.ResourceStream = resourceStream;
this.RequestOptions = requestOptions;
Expand All @@ -70,7 +72,9 @@ public ItemBatchOperation(

public int OperationIndex { get; internal set; }

internal CosmosDiagnosticsContext DiagnosticsContext { get; }
internal ContainerCore ContainerCore { get; }

internal CosmosDiagnosticsContext DiagnosticsContext { get; set; }

internal string PartitionKeyJson { get; set; }

Expand Down Expand Up @@ -308,15 +312,26 @@ internal int GetApproximateSerializedLength()
}

/// <summary>
/// Materializes the operation's resource into a Memory{byte} wrapping a byte array.
/// Encrypts (if encryption options are set) and materializes the operation's resource into a Memory{byte} wrapping a byte array.
/// </summary>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> for cancellation.</param>
internal virtual async Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
internal virtual async Task EncryptAndMaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.ResourceStream != null)
{
this.body = await BatchExecUtils.StreamToMemoryAsync(this.ResourceStream, cancellationToken);
Stream stream = this.ResourceStream;
if (this.ContainerCore != null && this.RequestOptions?.EncryptionOptions != null)
{
stream = await this.ContainerCore.ClientContext.EncryptItemAsync(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
stream,
this.RequestOptions.EncryptionOptions,
(DatabaseCore)this.ContainerCore.Database,
this.DiagnosticsContext,
cancellationToken);
}

this.body = await BatchExecUtils.StreamToMemoryAsync(stream, cancellationToken);
}
}

Expand Down Expand Up @@ -372,9 +387,10 @@ public ItemBatchOperation(
OperationType operationType,
int operationIndex,
T resource,
ContainerCore containerCore,
string id = null,
TransactionalBatchItemRequestOptions requestOptions = null)
: base(operationType, operationIndex, id: id, requestOptions: requestOptions)
: base(operationType, operationIndex, containerCore: containerCore, id: id, requestOptions: requestOptions)
{
this.Resource = resource;
}
Expand All @@ -386,15 +402,15 @@ public ItemBatchOperation(
/// </summary>
/// <param name="serializerCore">Serializer to serialize user provided objects to JSON.</param>
/// <param name="cancellationToken"><see cref="CancellationToken"/> for cancellation.</param>
internal override Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
internal override Task EncryptAndMaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.Resource != null)
{
this.ResourceStream = serializerCore.ToStream(this.Resource);
return base.MaterializeResourceAsync(serializerCore, cancellationToken);
return base.EncryptAndMaterializeResourceAsync(serializerCore, cancellationToken);
}

return Task.FromResult(true);
return Task.CompletedTask;
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/Batch/ServerBatchRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected async Task<ArraySegment<ItemBatchOperation>> CreateBodyStreamAsync(
break;
}

await operation.MaterializeResourceAsync(this.serializerCore, cancellationToken);
await operation.EncryptAndMaterializeResourceAsync(this.serializerCore, cancellationToken);
materializedCount++;

previousOperationIndex = operation.OperationIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,18 @@ public class TransactionalBatchItemRequestOptions : RequestOptions
/// The indexing directive to use with a request.
/// </value>
/// <seealso cref="Microsoft.Azure.Cosmos.IndexingPolicy"/>
/// <seealso cref="IndexingDirective"/>
public IndexingDirective? IndexingDirective { get; set; }

/// <summary>
/// Options to encrypt properties of the item.
/// </summary>
#if PREVIEW
public
#else
internal
#endif
EncryptionOptions EncryptionOptions { get; set; }

internal static TransactionalBatchItemRequestOptions FromItemRequestOptions(ItemRequestOptions itemRequestOptions)
{
if (itemRequestOptions == null)
Expand Down
18 changes: 17 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Serialization.HybridRow;
using Microsoft.Azure.Cosmos.Serialization.HybridRow.RecordIO;
Expand Down Expand Up @@ -217,7 +218,9 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(
ResponseMessage responseMessage,
ServerBatchRequest serverRequest,
CosmosSerializerCore serializer,
bool shouldPromoteOperationStatus = true)
bool shouldPromoteOperationStatus,
bool shouldPerformDecryption,
CancellationToken cancellationToken)
{
using (responseMessage)
{
Expand Down Expand Up @@ -307,6 +310,19 @@ internal static async Task<TransactionalBatchResponse> FromResponseMessageAsync(

response.CreateAndPopulateResults(serverRequest.Operations, retryAfterMilliseconds);
}
else if (shouldPerformDecryption)
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
for (int index = 0; index < serverRequest.Operations.Count; index++)
{
ContainerCore containerCore = serverRequest.Operations[index].ContainerCore;
TransactionalBatchOperationResult result = response.results[index];
result.ResourceStream = await containerCore.ClientContext.DecryptItemAsync(
result.ResourceStream,
(DatabaseCore)containerCore.Database,
responseMessage.DiagnosticsContext,
cancellationToken);
}
}

return response;
}
Expand Down
56 changes: 55 additions & 1 deletion Microsoft.Azure.Cosmos/src/Resource/ClientContextCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Text;
Expand Down Expand Up @@ -323,6 +324,59 @@ internal override BatchAsyncContainerExecutor GetExecutorForContainer(ContainerC
return this.batchExecutorCache.GetExecutorForContainer(container, this);
}

internal override async Task<Stream> EncryptItemAsync(
Stream input,
EncryptionOptions encryptionOptions,
DatabaseCore database,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
if (input == null)
{
throw new ArgumentException(ClientResources.InvalidRequestWithEncryptionOptions);
}

Debug.Assert(encryptionOptions != null);
Debug.Assert(database != null);
Debug.Assert(diagnosticsContext != null);

using (diagnosticsContext.CreateScope("Encrypt"))
{
return await this.EncryptionProcessor.EncryptAsync(
input,
encryptionOptions,
database,
this.ClientOptions.EncryptionKeyWrapProvider,
diagnosticsContext,
cancellationToken);
}
}

internal override async Task<Stream> DecryptItemAsync(
Stream input,
DatabaseCore database,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
if (input == null || this.ClientOptions.EncryptionKeyWrapProvider == null)
{
return input;
}

Debug.Assert(database != null);
Debug.Assert(diagnosticsContext != null);

using (diagnosticsContext.CreateScope("Decrypt"))
{
return await this.EncryptionProcessor.DecryptAsync(
input,
database,
this.ClientOptions.EncryptionKeyWrapProvider,
diagnosticsContext,
cancellationToken);
}
}

public override void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -394,7 +448,7 @@ private bool IsBulkOperationSupported(

private static HttpClientHandler CreateHttpClientHandler(CosmosClientOptions clientOptions)
{
if (clientOptions == null || (clientOptions.WebProxy == null))
if (clientOptions == null || clientOptions.WebProxy == null)
{
return null;
}
Expand Down
Loading