Skip to content

Commit

Permalink
Upgrade Resiliency: Adds Implementation for Validating the Unhealthy …
Browse files Browse the repository at this point in the history
…Backend Replicas in Direct mode (#3631)

* Code changes to implement replica validation in dotnet v3 sdk.

* Cosmetic changes to add inline code comments.

* Code chages to address review comments.

* Code changes to cover a scenario for async cache.

* Code changes to refactor async non-blocking cache code.

* Code changes to address minor review comments.

---------

Co-authored-by: Kiran Kumar Kolli <[email protected]>
  • Loading branch information
kundadebdatta and kirankumarkolli committed Mar 1, 2023
1 parent 0088c2f commit fd687f5
Show file tree
Hide file tree
Showing 6 changed files with 1,180 additions and 26 deletions.
93 changes: 69 additions & 24 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,11 @@ public async Task<TValue> GetAsync(
throw;
}

try
{
return await initialLazyValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: singleValueInitFunc);
}
catch (Exception e)
{
if (initialLazyValue.ShouldRemoveFromCacheThreadSafe())
{
DefaultTrace.TraceError(
"AsyncCacheNonBlocking.GetAsync with ForceRefresh Failed. key: {0}, Exception: {1}",
key,
e);

// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(e))
{
this.TryRemove(key);
}
}

throw;
}
return await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(GetAsync));
}

// The AsyncLazyWithRefreshTask is lazy and won't create the task until GetValue is called.
Expand Down Expand Up @@ -196,6 +177,70 @@ public bool TryRemove(TKey key)
return this.values.TryRemove(key, out _);
}

/// <summary>
/// Refreshes the async non blocking cache on-demand for the given <paramref name="key"/>
/// and caches the result for later usage.
/// </summary>
/// <param name="key">The requested key to be refreshed.</param>
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
public async Task RefreshAsync(
TKey key,
Func<TValue, Task<TValue>> singleValueInitFunc)
{
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
{
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(RefreshAsync));
}
}

/// <summary>
/// Creates a background task to invoke the callback delegate and updates the cache with the value returned from the delegate.
/// </summary>
/// <param name="key">The requested key to be updated.</param>
/// <param name="initialValue">An instance of <see cref="AsyncLazyWithRefreshTask{T}"/> containing the initial cached value.</param>
/// <param name="callbackDelegate">A func callback delegate to be invoked at a later point of time.</param>
/// <param name="operationName">A string indicating the operation on the cache.</param>
/// <returns>A <see cref="Task{TValue}"/> containing the updated, refreshed value.</returns>
private async Task<TValue> UpdateCacheAndGetValueFromBackgroundTaskAsync(
TKey key,
AsyncLazyWithRefreshTask<TValue> initialValue,
Func<TValue, Task<TValue>> callbackDelegate,
string operationName)
{
try
{
return await initialValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: callbackDelegate);
}
catch (Exception ex)
{
if (initialValue.ShouldRemoveFromCacheThreadSafe())
{
bool removed = false;

// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(ex))
{
removed = this.TryRemove(key);
}

DefaultTrace.TraceError(
"AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}",
key,
operationName,
removed,
ex);
}

throw;
}
}

/// <summary>
/// This is AsyncLazy that has an additional Task that can
/// be used to update the value. This allows concurrent requests
Expand Down
154 changes: 154 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private readonly bool enableTcpConnectionEndpointRediscovery;

private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
Expand Down Expand Up @@ -84,10 +85,27 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariableAsBool(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
}

public Uri ServiceEndpoint => this.serviceEndpoint;

/// <summary>
/// Gets the address information from the gateway and sets them into the async non blocking cache for later lookup.
/// Additionally attempts to establish Rntbd connections to the backend replicas based on `shouldOpenRntbdChannels`
/// boolean flag.
/// </summary>
/// <param name="databaseName">A string containing the database name.</param>
/// <param name="collection">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="partitionKeyRangeIdentities">A read only list containing the partition key range identities.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established
/// to the backend replica nodes. For cosmos client initialization and cache warmups, the Rntbd connection are needed to be
/// openned deterministically to the backend replicas to reduce latency, thus the <paramref name="shouldOpenRntbdChannels"/>
/// should be set to `true` during cosmos client initialization and cache warmups. The OpenAsync flow from DocumentClient
/// doesn't require the connections to be opened deterministically thus should set the parameter to `false`.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
Expand Down Expand Up @@ -161,6 +179,10 @@ public async Task OpenConnectionsAsync(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

// The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the
// backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as
// `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.openConnectionsHandler
Expand All @@ -178,6 +200,7 @@ public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHan
this.openConnectionsHandler = openConnectionsHandler;
}

/// <inheritdoc/>
public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down Expand Up @@ -229,6 +252,7 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
return this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: forceRefreshPartitionAddresses);
Expand Down Expand Up @@ -259,6 +283,7 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: false),
Expand All @@ -278,6 +303,27 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
this.suboptimalServerPartitionTimestamps.TryAdd(partitionKeyRangeIdentity, DateTime.UtcNow);
}

// Refresh the cache on-demand, if there were some address that remained as unhealthy long enough (more than 1 minute)
// and need to revalidate its status. The reason it is not dependent on 410 to force refresh the addresses, is being:
// When an address is marked as unhealthy, then the address enumerator will deprioritize it and move it back to the
// end of the transport uris list. Therefore, it could happen that no request will land on the unhealthy address for
// an extended period of time therefore, the chances of 410 (Gone Exception) to trigger the forceRefresh workflow may
// not happen for that particular replica.
if (addresses
.Get(Protocol.Tcp)
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
Task refreshAddressesInBackgroundTask = Task.Run(async () => await this.serverPartitionAddressCache.RefreshAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true)));
}

return addresses;
}
catch (DocumentClientException ex)
Expand Down Expand Up @@ -384,6 +430,7 @@ public async Task<PartitionAddressInformation> UpdateAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
null,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true),
Expand Down Expand Up @@ -444,6 +491,7 @@ private async Task<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>

private async Task<PartitionAddressInformation> GetAddressesForRangeIdAsync(
DocumentServiceRequest request,
PartitionAddressInformation cachedAddresses,
string collectionRid,
string partitionKeyRangeId,
bool forceRefresh)
Expand Down Expand Up @@ -475,6 +523,32 @@ await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { par
throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid };
}

if (this.isReplicaAddressValidationEnabled)
{
// The purpose of this step is to merge the new transport addresses with the old one. What this means is -
// 1. If a newly returned address from gateway is already a part of the cache, then restore the health state
// of the new address with that of the cached one.
// 2. If a newly returned address from gateway doesn't exist in the cache, then keep using the new address
// with `Unknown` (initial) status.
PartitionAddressInformation mergedAddresses = GatewayAddressCache.MergeAddresses(result.Item2, cachedAddresses);
IReadOnlyList<TransportAddressUri> transportAddressUris = mergedAddresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;

// If cachedAddresses are null, that would mean that the returned address from gateway would remain in Unknown
// status and there is no cached state that could transition them into Unhealthy.
if (cachedAddresses != null)
{
foreach (TransportAddressUri address in transportAddressUris)
{
// The main purpose for this step is to move address health status from Unhealthy to UnhealthyPending.
address.SetRefreshedIfUnhealthy();
}
}

this.ValidateUnhealthyPendingReplicas(transportAddressUris);

return mergedAddresses;
}

return result.Item2;
}
}
Expand Down Expand Up @@ -760,6 +834,86 @@ await this.GetServerAddressesViaGatewayAsync(
}
}

/// <summary>
/// Validates the unhealthy pending replicas by attempting to open the Rntbd connection. This operation
/// will eventually marks the unhealthy pending replicas to healthy, if the rntbd connection attempt made was
/// successful or unhealthy otherwise.
/// </summary>
/// <param name="addresses">A read-only list of <see cref="TransportAddressUri"/> needs to be validated.</param>
private void ValidateUnhealthyPendingReplicas(
IReadOnlyList<TransportAddressUri> addresses)
{
if (addresses == null)
{
throw new ArgumentNullException(nameof(addresses));
}

IEnumerable<TransportAddressUri> addressesNeedToValidation = addresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() == TransportAddressHealthState.HealthStatus.UnhealthyPending);

if (addressesNeedToValidation.Any())
{
Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync(
addresses: addressesNeedToValidation.ToList()));
}
}

/// <summary>
/// Merge the new addresses returned from gateway service with that of the cached addresses. If the returned
/// new addresses list contains some of the addresses, which are already cached, then reset the health state
/// of the new address to that of the cached one. If the the new addresses doesn't contain any of the cached
/// addresses, then keep using the health state of the new addresses, which should be `unknown`.
/// </summary>
/// <param name="newAddresses">A list of <see cref="PartitionAddressInformation"/> containing the latest
/// addresses being returned from gateway.</param>
/// <param name="cachedAddresses">A list of <see cref="PartitionAddressInformation"/> containing the cached
/// addresses from the async non blocking cache.</param>
/// <returns>A list of <see cref="PartitionAddressInformation"/> containing the merged addresses.</returns>
private static PartitionAddressInformation MergeAddresses(
PartitionAddressInformation newAddresses,
PartitionAddressInformation cachedAddresses)
{
if (newAddresses == null)
{
throw new ArgumentNullException(nameof(newAddresses));
}

if (cachedAddresses == null)
{
return newAddresses;
}

PerProtocolPartitionAddressInformation currentAddressInfo = newAddresses.Get(Protocol.Tcp);
PerProtocolPartitionAddressInformation cachedAddressInfo = cachedAddresses.Get(Protocol.Tcp);
Dictionary<string, TransportAddressUri> cachedAddressDict = new ();

foreach (TransportAddressUri transportAddressUri in cachedAddressInfo.ReplicaTransportAddressUris)
{
cachedAddressDict[transportAddressUri.ToString()] = transportAddressUri;
}

foreach (TransportAddressUri transportAddressUri in currentAddressInfo.ReplicaTransportAddressUris)
{
if (cachedAddressDict.ContainsKey(transportAddressUri.ToString()))
{
TransportAddressUri cachedTransportAddressUri = cachedAddressDict[transportAddressUri.ToString()];
transportAddressUri.ResetHealthStatus(
status: cachedTransportAddressUri.GetCurrentHealthState().GetHealthStatus(),
lastUnknownTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unknown),
lastUnhealthyPendingTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.UnhealthyPending),
lastUnhealthyTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unhealthy));

}
}

return newAddresses;
}

protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
Expand Down
Loading

0 comments on commit fd687f5

Please sign in to comment.