Skip to content

Commit

Permalink
Code changes to support binary encoding for point operations.
Browse files Browse the repository at this point in the history
Code changes to update STJ Serializer

Adding more tests

Fixing item emulator test.

Minor cosmetic changes.

Adding more tests.

Fixing the cdb to newtonsoft serializer.

Code changes to fix ns reader. Adding more tests.

Minor refactoring.

Optimizing some of the serialization code.

Code changes to change serializer for patch operations.

Modularizating the codebase.

Adding summary for serialization utils.

Code changest to add changes and tests for patch operation.

Adding conversation logic in patch op.

Code changes to add tests for patch operation.

Code changes to refactor binary conversation logic.

Some refactor. Added requred unit tests.

Code changes to orginaze serializer and de-serializer. Modified default json serializer.

Provide option to request binary from item request options.

Code changes to add binary serializer for non stream apis.

Changes in request invocation handler.

remove unnecessary using

Further optimizations.

Code changes to refactor serialization and de-serialization logic.
  • Loading branch information
kundadebdatta committed Sep 18, 2024
1 parent 5948484 commit 2cdf16b
Show file tree
Hide file tree
Showing 15 changed files with 2,275 additions and 1,075 deletions.
11 changes: 11 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public override async Task<ResponseMessage> SendAsync(
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& request.OperationType.IsPointOperation()
&& request.ResourceType == ResourceType.Document)
{
request.Headers.Add(HttpConstants.HttpHeaders.SupportedSerializationFormats, SupportedSerializationFormats.CosmosBinary.ToString());
if (request.Content != null)
{
request.Headers.Add(HttpConstants.HttpHeaders.ContentSerializationFormat, SupportedSerializationFormats.CosmosBinary.ToString());
}
}

await this.ValidateAndSetConsistencyLevelAsync(request);
this.SetPriorityLevel(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public override byte[] ReadAsBytes()
public override DateTime? ReadAsDateTime()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -211,7 +211,7 @@ public override byte[] ReadAsBytes()
public override DateTimeOffset? ReadAsDateTimeOffset()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ public override byte[] ReadAsBytes()
public override string ReadAsString()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -278,7 +278,7 @@ public override string ReadAsString()
private double? ReadNumberValue()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public override void WriteValue(bool value)
/// <param name="value">The <see cref="short"/> value to write.</param>
public override void WriteValue(short value)
{
base.WriteValue((long)value);
base.WriteValue((Int16)value);
this.jsonWriter.WriteInt16Value(value);
}

/// <summary>
Expand Down
20 changes: 20 additions & 0 deletions Microsoft.Azure.Cosmos/src/RequestOptions/ItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ public ConsistencyLevel? ConsistencyLevel
/// </remarks>
public DedicatedGatewayRequestOptions DedicatedGatewayRequestOptions { get; set; }

/// <summary>
/// Gets or sets the boolean to enable binary response for point operations like Create, Upsert, Read, Patch, and Replace.
/// Setting this option to true will cause the response to be in binary format, which can reduce networking and CPU load
/// by not sending the resource back over the network and serializing it on the client.
/// </summary>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// ItemRequestOptions requestOptions = new ItemRequestOptions() { EnableBinaryResponseOnPointOperations = true };
/// ItemResponse itemResponse = await this.container.CreateItemAsync<ToDoActivity>(tests, new PartitionKey(test.status), requestOptions);
/// Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode);
/// Assert.IsNotNull(itemResponse.Resource);
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// This is optimal for workloads where the returned resource can be processed in binary format.
/// </remarks>
internal bool EnableBinaryResponseOnPointOperations { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand Down
121 changes: 92 additions & 29 deletions Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ public async Task<ResponseMessage> CreateItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: null,
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> CreateItemAsync<T>(
Expand All @@ -79,7 +81,8 @@ public async Task<ItemResponse<T>> CreateItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -92,15 +95,17 @@ public async Task<ResponseMessage> ReadItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> ReadItemAsync<T>(
Expand All @@ -115,7 +120,8 @@ public async Task<ItemResponse<T>> ReadItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -136,7 +142,9 @@ public async Task<ResponseMessage> UpsertItemStreamAsync(
operationType: OperationType.Upsert,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> UpsertItemAsync<T>(
Expand All @@ -156,7 +164,8 @@ public async Task<ItemResponse<T>> UpsertItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -178,7 +187,9 @@ public async Task<ResponseMessage> ReplaceItemStreamAsync(
operationType: OperationType.Replace,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
Expand All @@ -204,7 +215,8 @@ public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -217,15 +229,17 @@ public async Task<ResponseMessage> DeleteItemStreamAsync(
ITrace trace,
ItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
{
{
return await this.ProcessItemStreamAsync(
partitionKey: partitionKey,
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public async Task<ItemResponse<T>> DeleteItemAsync<T>(
Expand All @@ -240,7 +254,8 @@ public async Task<ItemResponse<T>> DeleteItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -828,7 +843,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
string itemId,
T item,
OperationType operationType,
ItemRequestOptions requestOptions,
ItemRequestOptions requestOptions,
JsonSerializationFormat? targetRequestSerializationFormat,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -840,7 +856,7 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
Stream itemStream;
using (trace.StartChild("ItemSerialize"))
{
itemStream = this.ClientContext.SerializerCore.ToStream<T>(item);
itemStream = this.ClientContext.SerializerCore.ToStream<T>(item);
}

// User specified PK value, no need to extract it
Expand All @@ -853,7 +869,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
operationType,
requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: targetRequestSerializationFormat);
}

PartitionKeyMismatchRetryPolicy requestRetryPolicy = null;
Expand All @@ -866,7 +883,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemId,
itemStream,
operationType,
requestOptions,
requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -897,7 +915,9 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
OperationType operationType,
ItemRequestOptions requestOptions,
ITrace trace,
CancellationToken cancellationToken)
CancellationToken cancellationToken,
JsonSerializationFormat? targetRequestSerializationFormat,
JsonSerializationFormat? targetResponseSerializationFormat = default)
{
if (trace == null)
{
Expand All @@ -910,8 +930,18 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
}

ContainerInternal.ValidatePartitionKey(partitionKey, requestOptions);
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
if (CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Text,
targetSerializationFormat: targetRequestSerializationFormat,
inputStream: streamPayload,
outputStream: out Stream outputRequestStream))
{
streamPayload = outputRequestStream;
}

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
resourceType: ResourceType.Document,
Expand All @@ -923,9 +953,20 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
streamPayload: streamPayload,
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);

return responseMessage;
cancellationToken: cancellationToken);

// Convert Binary Stream to Text.
if ((requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Binary,
targetSerializationFormat: targetResponseSerializationFormat,
inputStream: responseMessage?.Content,
outputStream: out Stream outputResponseStream))
{
responseMessage.Content = outputResponseStream;
}

return responseMessage;
}

public override async Task<PartitionKey> GetPartitionKeyValueFromStreamAsync(
Expand Down Expand Up @@ -1120,13 +1161,14 @@ public async Task<ItemResponse<T>> PatchItemAsync<T>(
return this.ClientContext.ResponseFactory.CreateItemResponse<T>(responseMessage);
}

public Task<ResponseMessage> PatchItemStreamAsync(
public async Task<ResponseMessage> PatchItemStreamAsync(
string id,
PartitionKey partitionKey,
IReadOnlyList<PatchOperation> patchOperations,
ITrace trace,
PatchItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default,
JsonSerializationFormat? targetResponseSerializationFormat = default)
{
if (trace == null)
{
Expand Down Expand Up @@ -1159,8 +1201,8 @@ public Task<ResponseMessage> PatchItemStreamAsync(
{
patchOperationsStream = this.ClientContext.SerializerCore.ToStream(new PatchSpec(patchOperations, requestOptions));
}

return this.ClientContext.ProcessResourceOperationStreamAsync(

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.GetResourceUri(
requestOptions,
OperationType.Patch,
Expand All @@ -1174,7 +1216,19 @@ public Task<ResponseMessage> PatchItemStreamAsync(
streamPayload: patchOperationsStream,
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken);

if ((requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
expectedSerializationFormat: JsonSerializationFormat.Binary,
targetSerializationFormat: targetResponseSerializationFormat,
inputStream: responseMessage?.Content,
outputStream: out Stream outputResponseStream))
{
responseMessage.Content = outputResponseStream;
}

return responseMessage;
}

public Task<ResponseMessage> PatchItemStreamAsync(
Expand Down Expand Up @@ -1217,7 +1271,9 @@ public Task<ResponseMessage> PatchItemStreamAsync(
operationType: OperationType.Patch,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<T>(
Expand Down Expand Up @@ -1252,6 +1308,13 @@ private ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderPrivate(
container: this,
changeFeedProcessor: changeFeedProcessor,
applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration).WithChangeFeedMode(mode);
}

private static JsonSerializationFormat GetTargetRequestSerializationFormat()
{
return ConfigurationManager.IsBinaryEncodingEnabled()
? JsonSerializationFormat.Binary
: JsonSerializationFormat.Text;
}
}
}
Loading

0 comments on commit 2cdf16b

Please sign in to comment.