Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hedging: Adds Read hedging PREVIEW contracts #4598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d26111e
Update contracts
NaluTripician Jul 22, 2024
3028607
removed unneeded changes
NaluTripician Jul 22, 2024
e444eb2
made other contracts public
NaluTripician Jul 23, 2024
a0c6bc7
update enable method
NaluTripician Jul 23, 2024
ddfc265
comments
NaluTripician Jul 24, 2024
b3d3cc7
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Jul 24, 2024
2bab5b6
contract update
NaluTripician Jul 24, 2024
49d6150
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Jul 30, 2024
bd0a406
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 2, 2024
f40e734
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 9, 2024
02cc8bb
changed eneabled to internal
NaluTripician Aug 12, 2024
f7da9f0
fixed internal method
NaluTripician Aug 12, 2024
da73fa6
Revert "fixed internal method"
NaluTripician Aug 12, 2024
e9b3d93
revert + change methods to internal
NaluTripician Aug 12, 2024
28addfc
fixed internal
NaluTripician Aug 12, 2024
9340015
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 12, 2024
97509fc
changed to factory creation
NaluTripician Aug 13, 2024
1665cdb
disabledstrat fix
NaluTripician Aug 13, 2024
bd4f0f4
fixed test + contracts
NaluTripician Aug 13, 2024
43713ee
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 13, 2024
343b3eb
xml changes
NaluTripician Aug 13, 2024
c2f352b
Merge branch 'users/nalutripician/hedgingPreviewAPIs' of https://gith…
NaluTripician Aug 13, 2024
391c20f
updated comments
NaluTripician Aug 13, 2024
98bc27b
Fixed Tests
NaluTripician Aug 13, 2024
02b94d2
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 14, 2024
2010458
requested changes
NaluTripician Aug 14, 2024
2ceae54
added client options check and one region check
NaluTripician Aug 14, 2024
8fde5bf
fix get
NaluTripician Aug 15, 2024
a779ab8
fixed test
NaluTripician Aug 15, 2024
c9e5023
fixed set
NaluTripician Aug 15, 2024
c046248
fix client options
NaluTripician Aug 15, 2024
6f04933
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 15, 2024
dfb0a54
updatecontracts
NaluTripician Aug 15, 2024
6bb9e31
requested changes
NaluTripician Aug 16, 2024
0e9c7a8
fixed validate method
NaluTripician Aug 16, 2024
0bf56c5
Delete MultiRegionSetupHelpers.cs
NaluTripician Aug 16, 2024
1a24ebc
update contracts
NaluTripician Aug 16, 2024
499fa56
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 21, 2024
5d3fdce
Merge branch 'master' into users/nalutripician/hedgingPreviewAPIs
NaluTripician Aug 23, 2024
0f9ee95
fixed merge
NaluTripician Aug 23, 2024
8ae8813
fixed check
NaluTripician Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,35 @@ public Func<HttpClient> HttpClientFactory
/// <summary>
/// Availability Strategy to be used for periods of high latency
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
internal AvailabilityStrategy AvailabilityStrategy { get; set; }
/// /// <example>
/// An example on how to set an availability strategy custom serializer.
/// <code language="c#">
/// <![CDATA[
/// CosmosClient client = new CosmosClientBuilder("connection string")
/// .WithApplicationPreferredRegions(
/// new List<string> { "East US", "Central US", "West US" } )
/// .WithAvailabilityStrategy(
/// AvailabilityStrategy.CrossRegionHedgingStrategy(
/// threshold: TimeSpan.FromMilliseconds(500),
/// thresholdStep: TimeSpan.FromMilliseconds(100)
/// ))
/// .Build();
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// The availability strategy in the example is a Cross Region Hedging Strategy.
/// These strategies take two values, a threshold and a threshold step.When a request that is sent
/// out takes longer than the threshold time, the SDK will hedge to the second region in the application preferred regions list.
/// If a response from either the primary request or the first hedged request is not received
/// after the threshold step time, the SDK will hedge to the third region and so on.
/// </remarks>
#if PREVIEW
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
public
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
#else
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Enable partition key level failover
Expand Down Expand Up @@ -887,7 +915,8 @@ internal virtual ConnectionPolicy GetConnectionPolicy(int clientId)
{
this.ValidateDirectTCPSettings();
this.ValidateLimitToEndpointSettings();
this.ValidatePartitionLevelFailoverSettings();
this.ValidatePartitionLevelFailoverSettings();
this.ValidateAvailabilityStrategy();

ConnectionPolicy connectionPolicy = new ConnectionPolicy()
{
Expand Down Expand Up @@ -1064,6 +1093,15 @@ private void ValidatePartitionLevelFailoverSettings()
{
throw new ArgumentException($"{nameof(this.ApplicationPreferredRegions)} is required when {nameof(this.EnablePartitionLevelFailover)} is enabled.");
}
}

private void ValidateAvailabilityStrategy()
{
if (this.AvailabilityStrategy != null
&& this.ApplicationPreferredRegions == null && this.ApplicationRegion == null)
{
throw new ArgumentException($"{nameof(this.ApplicationPreferredRegions)} or {nameof(this.ApplicationRegion)} must be set to use {nameof(this.AvailabilityStrategy)}");
}
}

private void ValidateDirectTCPSettings()
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,12 @@ internal CosmosClientBuilder WithApiType(ApiType apiType)
/// </summary>
/// <param name="strategy"></param>
/// <returns>The CosmosClientBuilder</returns>
internal CosmosClientBuilder WithAvailibilityStrategy(AvailabilityStrategy strategy)
#if PREVIEW
public
#else
internal
#endif
CosmosClientBuilder WithAvailibilityStrategy(AvailabilityStrategy strategy)
{
this.clientOptions.AvailabilityStrategy = strategy;
return this;
Expand Down
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public override async Task<ResponseMessage> SendAsync(
await request.AssertPartitioningDetailsAsync(this.client, cancellationToken, request.Trace);
this.FillMultiMasterContext(request);

AvailabilityStrategy strategy = this.AvailabilityStrategy(request);
AvailabilityStrategyInternal strategy = this.AvailabilityStrategy(request);

ResponseMessage response = strategy != null && strategy.Enabled()
? await strategy.ExecuteAvailabilityStrategyAsync(
Expand All @@ -103,10 +103,17 @@ public override async Task<ResponseMessage> SendAsync(
/// </summary>
/// <param name="request"></param>
/// <returns>whether the request should be a parallel hedging request.</returns>
public AvailabilityStrategy AvailabilityStrategy(RequestMessage request)
public AvailabilityStrategyInternal AvailabilityStrategy(RequestMessage request)
{
return request.RequestOptions?.AvailabilityStrategy
AvailabilityStrategy strategy = request.RequestOptions?.AvailabilityStrategy
?? this.client.ClientOptions.AvailabilityStrategy;

if (strategy == null)
{
return null;
}

return strategy as AvailabilityStrategyInternal;
}

public virtual async Task<ResponseMessage> BaseSendAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ public class RequestOptions
/// reduce latency and increase availability. Currently there is one type of availability strategy, parallel request hedging.
/// If there is a globally enabled availability strategy, setting one in the request options will override the global one.
/// </summary>
internal AvailabilityStrategy AvailabilityStrategy { get; set; }
#if PREVIEW
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
public
#else
internal
#endif
AvailabilityStrategy AvailabilityStrategy { get; set; }

/// <summary>
/// Gets or sets the boolean to use effective partition key routing in the cosmos db request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,46 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Handlers;

/// <summary>
/// Types of availability strategies supported
/// </summary>
internal abstract class AvailabilityStrategy
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
#if PREVIEW
public
#else
internal
#endif
abstract class AvailabilityStrategy
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Execute the availability strategy
/// Default constructor
/// </summary>
/// <param name="sender"></param>
/// <param name="client"></param>
/// <param name="requestMessage"></param>
/// <param name="cancellationToken"></param>
/// <returns>The response from the service after the availability strategy is executed</returns>
public abstract Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
CosmosClient client,
RequestMessage requestMessage,
CancellationToken cancellationToken);
internal AvailabilityStrategy()
{
}

internal abstract bool Enabled();
/// <summary>
/// Used on a per request level to disable a client level AvailabilityStrategy
/// </summary>
/// <returns>something</returns>
internal static AvailabilityStrategy DisabledStrategy()
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
{
return new DisabledAvailabilityStrategy();
}

/// <summary>
/// After a request's duration passes a threshold, this strategy will send out
/// hedged request to other regions. The first hedge request will be sent after the threshold.
/// After that, the strategy will send out a request every thresholdStep
/// until the request is completed or regions are exausted
/// </summary>
/// <param name="threshold"> how long before SDK begins hedging</param>
/// <param name="thresholdStep">Period of time between first hedge and next hedging attempts</param>
/// <returns>something</returns>
public static AvailabilityStrategy CrossRegionHedgingStrategy(TimeSpan threshold,
TimeSpan? thresholdStep)
{
return new CrossRegionHedgingAvailabilityStrategy(threshold, thresholdStep);
}
}
}
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;

internal abstract class AvailabilityStrategyInternal : AvailabilityStrategy
{
/// <summary>
/// Execute the availability strategy
/// </summary>
/// <param name="sender"></param>
/// <param name="client"></param>
/// <param name="requestMessage"></param>
/// <param name="cancellationToken"></param>
/// <returns>The response from the service after the availability strategy is executed</returns>
internal abstract Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
CosmosClient client,
RequestMessage requestMessage,
CancellationToken cancellationToken);

/// <summary>
/// Checks to see if the strategy is enabled
/// </summary>
/// <returns>a bool representing if the strategy is enabled</returns>
internal abstract bool Enabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Documents;

/// <summary>
/// Parallel hedging availability strategy. Once threshold time is reached,
/// Hedging availability strategy. Once threshold time is reached,
/// the SDK will send out an additional request to a remote region in parallel
/// if the first parallel request or the original has not returned after the step time,
/// additional parallel requests will be sent out there is a response or all regions are exausted.
/// if the first hedging request or the original has not returned after the step time,
/// additional hedged requests will be sent out there is a response or all regions are exausted.
/// </summary>
internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy
internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInternal
{
private const string HedgeContext = "Hedge Context";
private const string ResponseRegion = "Response Region";
Expand All @@ -37,11 +37,11 @@ internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStra
public TimeSpan ThresholdStep { get; private set; }

/// <summary>
/// Constructor for parallel hedging availability strategy
/// Constructor for hedging availability strategy
/// </summary>
/// <param name="threshold"></param>
/// <param name="thresholdStep"></param>
public CrossRegionParallelHedgingAvailabilityStrategy(
public CrossRegionHedgingAvailabilityStrategy(
TimeSpan threshold,
TimeSpan? thresholdStep)
{
Expand All @@ -59,17 +59,18 @@ public CrossRegionParallelHedgingAvailabilityStrategy(
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
}

/// <inheritdoc/>
internal override bool Enabled()
{
return true;
}

/// <summary>
/// This method determines if the request should be sent with a parallel hedging availability strategy.
/// This method determines if the request should be sent with a hedging availability strategy.
/// This availability strategy can only be used if the request is a read-only request on a document request.
/// </summary>
/// <param name="request"></param>
/// <returns>whether the request should be a parallel hedging request.</returns>
/// <returns>whether the request should be a hedging request.</returns>
internal bool ShouldHedge(RequestMessage request)
{
//Only use availability strategy for document point operations
Expand All @@ -88,20 +89,21 @@ internal bool ShouldHedge(RequestMessage request)
}

/// <summary>
/// Execute the parallel hedging availability strategy
/// Execute the hedging availability strategy
/// </summary>
/// <param name="sender"></param>
/// <param name="client"></param>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns>The response after executing cross region hedging</returns>
public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
CosmosClient client,
RequestMessage request,
CancellationToken cancellationToken)
{
if (!this.ShouldHedge(request))
if (!this.ShouldHedge(request)
|| client.DocumentClient.GlobalEndpointManager.ReadEndpoints.Count == 1)
{
return await sender(request, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace Microsoft.Azure.Cosmos
/// <summary>
/// A Disabled availability strategy that does not do anything. Used for overriding the default global availability strategy.
/// </summary>
internal class DisabledAvailabilityStrategy : AvailabilityStrategy
internal class DisabledAvailabilityStrategy : AvailabilityStrategyInternal
{
/// <inheritdoc/>
internal override bool Enabled()
{
return false;
Expand All @@ -25,7 +26,7 @@ internal override bool Enabled()
/// <param name="requestMessage"></param>
/// <param name="cancellationToken"></param>
/// <returns>nothing, this will throw.</returns>
public override Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
internal override Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
Func<RequestMessage,
CancellationToken,
Task<ResponseMessage>> sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public async Task AvailabilityStrategyNoTriggerTest()
{
ConnectionMode = ConnectionMode.Direct,
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(300),
thresholdStep: TimeSpan.FromMilliseconds(50)),
Serializer = this.cosmosSystemTextJsonSerializer
Expand Down Expand Up @@ -272,7 +272,7 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest()

ItemRequestOptions requestOptions = new ItemRequestOptions
{
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = new CrossRegionHedgingAvailabilityStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50))
};
Expand Down Expand Up @@ -317,7 +317,7 @@ public async Task AvailabilityStrategyDisableOverideTest()
{
ConnectionMode = ConnectionMode.Direct,
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50)),
Serializer = this.cosmosSystemTextJsonSerializer
Expand Down Expand Up @@ -416,7 +416,7 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
{
ConnectionMode = ConnectionMode.Direct,
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US" },
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50)),
Serializer = this.cosmosSystemTextJsonSerializer
Expand Down Expand Up @@ -597,7 +597,7 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
{
ConnectionMode = ConnectionMode.Direct,
ApplicationPreferredRegions = new List<string>() { "Central US", "North Central US", "East US" },
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50)),
Serializer = this.cosmosSystemTextJsonSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Task HedgeNestingDiagnosticsTest()

ItemRequestOptions requestOptions = new ItemRequestOptions
{
AvailabilityStrategy = new CrossRegionParallelHedgingAvailabilityStrategy(
AvailabilityStrategy = AvailabilityStrategy.CrossRegionHedgingStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50))
};
Expand Down
Loading
Loading