Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session being updated incorrectly on Gateway mode #769

Merged
merged 3 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ public GatewayStoreModel(
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
this.CaptureSessionToken(request, exception.Headers);
this.CaptureSessionToken(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
}

throw;
}

this.CaptureSessionToken(request, response.Headers);
this.CaptureSessionToken(response.StatusCode, response.SubStatusCode, request, response.Headers);
return response;
}

Expand Down Expand Up @@ -169,8 +169,30 @@ public void Dispose()
GC.SuppressFinalize(this);
}

private void CaptureSessionToken(DocumentServiceRequest request, INameValueCollection responseHeaders)
private void CaptureSessionToken(
HttpStatusCode? statusCode,
SubStatusCodes subStatusCode,
DocumentServiceRequest request,
INameValueCollection responseHeaders)
{
// Exceptionless can try to capture session token from CompleteResponse
if (request.IsValidStatusCodeForExceptionlessRetry((int)statusCode, subStatusCode))
{
// Not capturing on master resources
if (ReplicatedResourceClient.IsMasterResource(request.ResourceType))
{
return;
}

// Only capturing on 409, 412, 404 && !1002
if (statusCode != HttpStatusCode.PreconditionFailed
&& statusCode != HttpStatusCode.Conflict
&& (statusCode != HttpStatusCode.NotFound || subStatusCode == SubStatusCodes.ReadSessionNotAvailable))
{
return;
}
}

if (request.ResourceType == ResourceType.Collection && request.OperationType == OperationType.Delete)
{
string resourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,144 @@ public async Task TestErrorResponsesProvideBody()

}

[TestMethod]
// Verify that for known exceptions, session token is updated
public async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException()
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.SessionToken, "0:1#100#1=20#2=5#3=31");
headers.Set(WFConstants.BackendHeaders.LocalLSN, "10");
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new ConflictException("test", headers, new Uri("http://one.com")));
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new NotFoundException("test", headers, new Uri("http://one.com")));
await this.GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(new PreconditionFailedException("test", headers, new Uri("http://one.com")));
}

private async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownException(Exception ex)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
throw ex;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
try
{
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.Fail("Should had thrown exception");
}
catch (Exception)
{
// Expecting exception
}
Assert.AreEqual(updatedSessionToken, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

[TestMethod]
// Verify that for 429 exceptions, session token is not updated
public async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownExceptions()
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.SessionToken, "0:1#100#1=20#2=5#3=30");
headers.Set(WFConstants.BackendHeaders.LocalLSN, "10");
await this.GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownException(new RequestRateTooLargeException("429", headers, new Uri("http://one.com")));
}

private async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownException(Exception ex)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
throw ex;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
try
{
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.Fail("Should had thrown exception");
}
catch (Exception)
{
// Expecting exception
}
Assert.AreEqual(string.Empty, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

/// <summary>
/// Tests that empty session token is sent for operations on Session Consistent resources like
/// Databases, Collections, Users, Permissions, PartitionKeyRanges, DatabaseAccounts and Offers
Expand Down Expand Up @@ -279,6 +417,137 @@ public async Task TestSessionTokenAvailability()

}

[TestMethod]
// When exceptionless is turned on Session Token should only be updated on known failures
public async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownFailedStoreResponses()
{
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.Conflict);
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.NotFound, SubStatusCodes.OwnerResourceNotFound);
await this.GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode.PreconditionFailed);
}

private async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResponses(HttpStatusCode httpStatusCode, SubStatusCodes subStatusCode = SubStatusCodes.Unknown)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
HttpResponseMessage response = new HttpResponseMessage(httpStatusCode);
response.Headers.Add(HttpConstants.HttpHeaders.SessionToken, updatedSessionToken);
response.Headers.Add(WFConstants.BackendHeaders.SubStatus, subStatusCode.ToString());
return response;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.AreEqual(updatedSessionToken, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

[TestMethod]
[Owner("maquaran")]
// Validates that if its a master resource, we don't update the Session Token, even though the status code would be one of the included ones
public async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownFailedMasterResource()
{
await this.GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType.Collection, HttpStatusCode.Conflict);
}

[TestMethod]
[Owner("maquaran")]
// When exceptionless is turned on Session Token should only be updated on known failures
public async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownFailedStoreResponses()
{
await this.GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType.Document, (HttpStatusCode)429);
}

private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownResponses(ResourceType resourceType, HttpStatusCode httpStatusCode, SubStatusCodes subStatusCode = SubStatusCodes.Unknown)
{
const string originalSessionToken = "0:1#100#1=20#2=5#3=30";
const string updatedSessionToken = "0:1#100#1=20#2=5#3=31";

Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc = async request =>
{
HttpResponseMessage response = new HttpResponseMessage(httpStatusCode);
response.Headers.Add(HttpConstants.HttpHeaders.SessionToken, updatedSessionToken);
response.Headers.Add(WFConstants.BackendHeaders.SubStatus, subStatusCode.ToString());
return response;
};

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
TimeSpan.FromSeconds(5),
ConsistencyLevel.Eventual,
eventSource,
null,
new UserAgentContainer(),
ApiType.None,
messageHandler);

INameValueCollection headers = new DictionaryNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Session.ToString());
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);
headers.Set(WFConstants.BackendHeaders.PartitionKeyRangeId, "0");

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
resourceType,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.UseStatusCodeFor429 = true;
request.UseStatusCodeForFailures = true;
DocumentServiceResponse response = await storeModel.ProcessMessageAsync(request);
Assert.AreEqual(string.Empty, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));
}
}
}

private class MockMessageHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc;
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#705](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/705) User agent suffix gets truncated
- [#753](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/753) Reason was not being propagated for Conflict exceptions
- [#756](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/756) Change Feed Processor with WithStartTime would execute the delegate the first time with no items.
- [#769](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/769) Gateway mode was updating Session Token incorrectly for some response types.
ealsur marked this conversation as resolved.
Show resolved Hide resolved

## [3.1.1](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.1.1) - 2019-08-12

Expand Down