Skip to content

Commit

Permalink
Session being updated incorrectly on Gateway mode (#769)
Browse files Browse the repository at this point in the history
* Fixing session

* changelog

* Updating changelog
  • Loading branch information
ealsur authored and kirankumarkolli committed Sep 5, 2019
1 parent 82317af commit 4162ff1
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 3 deletions.
28 changes: 25 additions & 3 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ public GatewayStoreModel(
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
this.CaptureSessionToken(request, exception.Headers);
this.CaptureSessionToken(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
}

throw;
}

this.CaptureSessionToken(request, response.Headers);
this.CaptureSessionToken(response.StatusCode, response.SubStatusCode, request, response.Headers);
return response;
}

Expand Down Expand Up @@ -169,8 +169,30 @@ public void Dispose()
GC.SuppressFinalize(this);
}

private void CaptureSessionToken(DocumentServiceRequest request, INameValueCollection responseHeaders)
private void CaptureSessionToken(
HttpStatusCode? statusCode,
SubStatusCodes subStatusCode,
DocumentServiceRequest request,
INameValueCollection responseHeaders)
{
// Exceptionless can try to capture session token from CompleteResponse
if (request.IsValidStatusCodeForExceptionlessRetry((int)statusCode, subStatusCode))
{
// Not capturing on master resources
if (ReplicatedResourceClient.IsMasterResource(request.ResourceType))
{
return;
}

// Only capturing on 409, 412, 404 && !1002
if (statusCode != HttpStatusCode.PreconditionFailed
&& statusCode != HttpStatusCode.Conflict
&& (statusCode != HttpStatusCode.NotFound || subStatusCode == SubStatusCodes.ReadSessionNotAvailable))
{
return;
}
}

if (request.ResourceType == ResourceType.Collection && request.OperationType == OperationType.Delete)
{
string resourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,144 @@ public async Task TestErrorResponsesProvideBody()

}

[TestMethod]
// Verify that for known exceptions, session token is updated
public async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException()
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.SessionToken, "0:1#100#1=20#2=5#3=31");
headers.Set(WFConstants.BackendHeaders.LocalLSN, "10");
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new ConflictException("test", headers, new Uri("http://one.com")));
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new NotFoundException("test", headers, new Uri("http://one.com")));
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new PreconditionFailedException("test", headers, new Uri("http://one.com")));
}

private async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(Exception ex)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
throw ex;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
try
{
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.Fail("Should had thrown exception");
}
catch (Exception)
{
// Expecting exception
}
Assert.AreEqual(updatedSessionToken, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

[TestMethod]
// Verify that for 429 exceptions, session token is not updated
public async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownExceptions()
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.SessionToken, "0:1#100#1=20#2=5#3=30");
headers.Set(WFConstants.BackendHeaders.LocalLSN, "10");
await this.GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownException(new RequestRateTooLargeException("429", headers, new Uri("http://one.com")));
}

private async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownException(Exception ex)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
throw ex;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
try
{
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.Fail("Should had thrown exception");
}
catch (Exception)
{
// Expecting exception
}
Assert.AreEqual(string.Empty, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

/// <summary>
/// Tests that empty session token is sent for operations on Session Consistent resources like
/// Databases, Collections, Users, Permissions, PartitionKeyRanges, DatabaseAccounts and Offers
Expand Down Expand Up @@ -279,6 +417,137 @@ public async Task TestSessionTokenAvailability()

}

[TestMethod]
// When exceptionless is turned on Session Token should only be updated on known failures
public async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownFailedStoreResponses()
{
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.Conflict);
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.NotFound, SubStatusCodes.OwnerResourceNotFound);
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.PreconditionFailed);
}

private async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode httpStatusCode, SubStatusCodes subStatusCode = SubStatusCodes.Unknown)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
HttpResponseMessage response = new HttpResponseMessage(httpStatusCode);
response.Headers.Add(HttpConstants.HttpHeaders.SessionToken, updatedSessionToken);
response.Headers.Add(WFConstants.BackendHeaders.SubStatus, subStatusCode.ToString());
return response;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.AreEqual(updatedSessionToken, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

[TestMethod]
[Owner("maquaran")]
// Validates that if its a master resource, we don't update the Session Token, even though the status code would be one of the included ones
public async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownFailedMasterResource()
{
await this.GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType.Collection, HttpStatusCode.Conflict);
}

[TestMethod]
[Owner("maquaran")]
// When exceptionless is turned on Session Token should only be updated on known failures
public async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownFailedStoreResponses()
{
await this.GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType.Document, (HttpStatusCode)429);
}

private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType resourceType, HttpStatusCode httpStatusCode, SubStatusCodes subStatusCode = SubStatusCodes.Unknown)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
HttpResponseMessage response = new HttpResponseMessage(httpStatusCode);
response.Headers.Add(HttpConstants.HttpHeaders.SessionToken, updatedSessionToken);
response.Headers.Add(WFConstants.BackendHeaders.SubStatus, subStatusCode.ToString());
return response;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
resourceType,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.AreEqual(string.Empty, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

private class MockMessageHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc;
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#705](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/705) User agent suffix gets truncated
- [#753](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/753) Reason was not being propagated for Conflict exceptions
- [#756](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/756) Change Feed Processor with WithStartTime would execute the delegate the first time with no items.
- [#769](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/769) Session Consistency + Gateway mode session-token bug fix: Under few rare non-success cases session token might be in-correct

## [3.1.1](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.1.1) - 2019-08-12

Expand Down

0 comments on commit 4162ff1

Please sign in to comment.