diff --git a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs index 66c827127c..ec592844d6 100644 --- a/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs +++ b/Microsoft.Azure.Cosmos/src/ConnectionPolicy.cs @@ -379,6 +379,18 @@ public int? MaxTcpConnectionsPerEndpoint set; } + /// + /// (Direct/TCP) Controls the client port reuse policy used by the transport stack. + /// + /// + /// The default value is PortReuseMode.ReuseUnicastPort. + /// + public PortReuseMode? PortReuseMode + { + get; + set; + } + /// /// (Direct/TCP) This is an advanced setting that controls the number of TCP connections that will be opened eagerly to each Cosmos DB back-end. /// diff --git a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs index 515ae3401e..97820b08a2 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClientOptions.cs @@ -62,6 +62,7 @@ public class CosmosClientOptions private TimeSpan? openTcpConnectionTimeout; private int? maxRequestsPerTcpConnection; private int? maxTcpConnectionsPerEndpoint; + private PortReuseMode? portReuseMode; private IWebProxy webProxy; /// @@ -274,6 +275,27 @@ public int? MaxTcpConnectionsPerEndpoint } } + /// + /// (Direct/TCP) Controls the client port reuse policy used by the transport stack. + /// + /// + /// The default value is PortReuseMode.ReuseUnicastPort. + /// + /// + /// ReuseUnicastPort and PrivatePortPool are not mutually exclusive. + /// When PrivatePortPool is enabled, the client first tries to reuse a port it already has. + /// It falls back to allocating a new port if the initial attempts failed. If this fails, too, the client then falls back to ReuseUnicastPort. + /// + public PortReuseMode? PortReuseMode + { + get => this.portReuseMode; + set + { + this.portReuseMode = value; + this.ValidateDirectTCPSettings(); + } + } + /// /// (Gateway/Https) Get or set the proxy information used for web requests. /// @@ -522,7 +544,8 @@ internal ConnectionPolicy GetConnectionPolicy() OpenTcpConnectionTimeout = this.OpenTcpConnectionTimeout, MaxRequestsPerTcpConnection = this.MaxRequestsPerTcpConnection, MaxTcpConnectionsPerEndpoint = this.MaxTcpConnectionsPerEndpoint, - EnableEndpointDiscovery = !this.LimitToEndpoint + EnableEndpointDiscovery = !this.LimitToEndpoint, + PortReuseMode = this.portReuseMode }; if (this.ApplicationRegion != null) @@ -650,6 +673,10 @@ private void ValidateDirectTCPSettings() { settingName = nameof(this.MaxTcpConnectionsPerEndpoint); } + else if (this.PortReuseMode.HasValue) + { + settingName = nameof(this.PortReuseMode); + } } if (!string.IsNullOrEmpty(settingName)) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index beba18517f..ceeefbc491 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -84,18 +84,20 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const string MaxRequestsPerChannelConfig = "CosmosDbMaxRequestsPerTcpChannel"; private const string TcpPartitionCount = "CosmosDbTcpPartitionCount"; private const string MaxChannelsPerHostConfig = "CosmosDbMaxTcpChannelsPerHost"; + private const string RntbdPortReuseMode = "CosmosDbTcpPortReusePolicy"; private const string RntbdReceiveHangDetectionTimeConfig = "CosmosDbTcpReceiveHangDetectionTimeSeconds"; private const string RntbdSendHangDetectionTimeConfig = "CosmosDbTcpSendHangDetectionTimeSeconds"; private const string EnableCpuMonitorConfig = "CosmosDbEnableCpuMonitor"; - - ////The MAC signature found in the HTTP request is not the same as the computed signature.Server used following string to sign - ////The input authorization token can't serve the request. Please check that the expected payload is built as per the protocol, and check the key being used. Server used the following payload to sign - private const string MacSignatureString = "to sign"; + + ////The MAC signature found in the HTTP request is not the same as the computed signature.Server used following string to sign + ////The input authorization token can't serve the request. Please check that the expected payload is built as per the protocol, and check the key being used. Server used the following payload to sign + private const string MacSignatureString = "to sign"; private const int MaxConcurrentConnectionOpenRequestsPerProcessor = 25; private const int DefaultMaxRequestsPerRntbdChannel = 30; private const int DefaultRntbdPartitionCount = 1; private const int DefaultMaxRntbdChannelsPerHost = ushort.MaxValue; + private const PortReuseMode DefaultRntbdPortReuseMode = PortReuseMode.ReuseUnicastPort; private const int DefaultRntbdReceiveHangDetectionTimeSeconds = 65; private const int DefaultRntbdSendHangDetectionTimeSeconds = 10; private const bool DefaultEnableCpuMonitor = true; @@ -111,6 +113,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private int maxRequestsPerRntbdChannel = DefaultMaxRequestsPerRntbdChannel; private int rntbdPartitionCount = DefaultRntbdPartitionCount; private int maxRntbdChannels = DefaultMaxRntbdChannelsPerHost; + private PortReuseMode rntbdPortReuseMode = DefaultRntbdPortReuseMode; private int rntbdReceiveHangDetectionTimeSeconds = DefaultRntbdReceiveHangDetectionTimeSeconds; private int rntbdSendHangDetectionTimeSeconds = DefaultRntbdSendHangDetectionTimeSeconds; private bool enableCpuMonitor = DefaultEnableCpuMonitor; @@ -141,9 +144,9 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider // Flag that indicates whether store client factory must be disposed whenever client is disposed. // Setting this flag to false will result in store client factory not being disposed when client is disposed. // This flag is used to allow shared store client factory survive disposition of a document client while other clients continue using it. - private bool isStoreClientFactoryCreatedInternally; - - //Id counter. + private bool isStoreClientFactoryCreatedInternally; + + //Id counter. private static int idCounter; //Trace Id. private int traceId; @@ -154,14 +157,14 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private readonly bool hasAuthKeyResourceToken; private readonly string authKeyResourceToken = string.Empty; - private DocumentClientEventSource eventSource; + private DocumentClientEventSource eventSource; internal Task initializeTask; private JsonSerializerSettings serializerSettings; private event EventHandler sendingRequest; private event EventHandler receivedResponse; - private Func transportClientHandlerFactory; - + private Func transportClientHandlerFactory; + /// /// Initializes a new instance of the class using the /// specified Azure Cosmos DB service endpoint, key, and connection policy for the Azure Cosmos DB service. @@ -434,8 +437,8 @@ internal DocumentClient(Uri serviceEndpoint, serviceEndpoint: serviceEndpoint, connectionPolicy: connectionPolicy, desiredConsistencyLevel: desiredConsistencyLevel, - handler: handler, - sessionContainer: sessionContainer, + handler: handler, + sessionContainer: sessionContainer, enableCpuMonitor: enableCpuMonitor, storeClientFactory: storeClientFactory); } @@ -793,147 +796,157 @@ internal virtual void Initialize(Uri serviceEndpoint, throw new ArgumentNullException("serviceEndpoint"); } - DefaultTrace.InitEventListener(); - + DefaultTrace.InitEventListener(); + #if !(NETSTANDARD15 || NETSTANDARD16) #if NETSTANDARD20 // GetEntryAssembly returns null when loaded from native netstandard2.0 - if (System.Reflection.Assembly.GetEntryAssembly() != null) - { -#endif - // For tests we want to allow stronger consistency during construction or per call - string allowOverrideStrongerConsistencyConfig = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.AllowOverrideStrongerConsistency]; - if (!string.IsNullOrEmpty(allowOverrideStrongerConsistencyConfig)) - { - if (!bool.TryParse(allowOverrideStrongerConsistencyConfig, out this.allowOverrideStrongerConsistency)) - { - this.allowOverrideStrongerConsistency = false; - } - } - - // We might want to override the defaults sometime - string maxConcurrentConnectionOpenRequestsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxConcurrentConnectionOpenConfig]; - if (!string.IsNullOrEmpty(maxConcurrentConnectionOpenRequestsOverrideString)) - { - int maxConcurrentConnectionOpenRequestOverrideInt = 0; - if (Int32.TryParse(maxConcurrentConnectionOpenRequestsOverrideString, out maxConcurrentConnectionOpenRequestOverrideInt)) - { - this.maxConcurrentConnectionOpenRequests = maxConcurrentConnectionOpenRequestOverrideInt; - } - } - - string openConnectionTimeoutInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.OpenConnectionTimeoutInSecondsConfig]; - if (!string.IsNullOrEmpty(openConnectionTimeoutInSecondsOverrideString)) - { - int openConnectionTimeoutInSecondsOverrideInt = 0; - if (Int32.TryParse(openConnectionTimeoutInSecondsOverrideString, out openConnectionTimeoutInSecondsOverrideInt)) - { - this.openConnectionTimeoutInSeconds = openConnectionTimeoutInSecondsOverrideInt; - } - } - - string idleConnectionTimeoutInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.IdleConnectionTimeoutInSecondsConfig]; - if (!string.IsNullOrEmpty(idleConnectionTimeoutInSecondsOverrideString)) - { - int idleConnectionTimeoutInSecondsOverrideInt = 0; - if (Int32.TryParse(idleConnectionTimeoutInSecondsOverrideString, out idleConnectionTimeoutInSecondsOverrideInt)) - { - this.idleConnectionTimeoutInSeconds = idleConnectionTimeoutInSecondsOverrideInt; - } - } - - string transportTimerPoolGranularityInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.TransportTimerPoolGranularityInSecondsConfig]; - if (!string.IsNullOrEmpty(transportTimerPoolGranularityInSecondsOverrideString)) - { - int timerPoolGranularityInSecondsOverrideInt = 0; - if (Int32.TryParse(transportTimerPoolGranularityInSecondsOverrideString, out timerPoolGranularityInSecondsOverrideInt)) - { - // timeoutgranularity specified should be greater than min(5 seconds) - if (timerPoolGranularityInSecondsOverrideInt > this.timerPoolGranularityInSeconds) - { - this.timerPoolGranularityInSeconds = timerPoolGranularityInSecondsOverrideInt; - } - } - } - - string enableRntbdChannelOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.EnableTcpChannelConfig]; - if (!string.IsNullOrEmpty(enableRntbdChannelOverrideString)) - { - bool enableRntbdChannel = false; - if (bool.TryParse(enableRntbdChannelOverrideString, out enableRntbdChannel)) - { - this.enableRntbdChannel = enableRntbdChannel; - } - } - - string maxRequestsPerRntbdChannelOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxRequestsPerChannelConfig]; - if (!string.IsNullOrEmpty(maxRequestsPerRntbdChannelOverrideString)) - { - int maxRequestsPerChannel = DocumentClient.DefaultMaxRequestsPerRntbdChannel; - if (int.TryParse(maxRequestsPerRntbdChannelOverrideString, out maxRequestsPerChannel)) - { - this.maxRequestsPerRntbdChannel = maxRequestsPerChannel; - } - } - - string rntbdPartitionCountOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.TcpPartitionCount]; - if (!string.IsNullOrEmpty(rntbdPartitionCountOverrideString)) - { - int rntbdPartitionCount = DocumentClient.DefaultRntbdPartitionCount; - if (int.TryParse(rntbdPartitionCountOverrideString, out rntbdPartitionCount)) - { - this.rntbdPartitionCount = rntbdPartitionCount; - } - } - - string maxRntbdChannelsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxChannelsPerHostConfig]; - if (!string.IsNullOrEmpty(maxRntbdChannelsOverrideString)) - { - int maxRntbdChannels = DefaultMaxRntbdChannelsPerHost; - if (int.TryParse(maxRntbdChannelsOverrideString, out maxRntbdChannels)) - { - this.maxRntbdChannels = maxRntbdChannels; - } - } - - string rntbdReceiveHangDetectionTimeSecondsString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.RntbdReceiveHangDetectionTimeConfig]; - if (!string.IsNullOrEmpty(rntbdReceiveHangDetectionTimeSecondsString)) - { - int rntbdReceiveHangDetectionTimeSeconds = DefaultRntbdReceiveHangDetectionTimeSeconds; - if (int.TryParse(rntbdReceiveHangDetectionTimeSecondsString, out rntbdReceiveHangDetectionTimeSeconds)) - { - this.rntbdReceiveHangDetectionTimeSeconds = rntbdReceiveHangDetectionTimeSeconds; - } - } - - string rntbdSendHangDetectionTimeSecondsString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.RntbdSendHangDetectionTimeConfig]; - if (!string.IsNullOrEmpty(rntbdSendHangDetectionTimeSecondsString)) - { - int rntbdSendHangDetectionTimeSeconds = DefaultRntbdSendHangDetectionTimeSeconds; - if (int.TryParse(rntbdSendHangDetectionTimeSecondsString, out rntbdSendHangDetectionTimeSeconds)) - { - this.rntbdSendHangDetectionTimeSeconds = rntbdSendHangDetectionTimeSeconds; - } - } - - if (enableCpuMonitor.HasValue) - { - this.enableCpuMonitor = enableCpuMonitor.Value; - } - else - { - string enableCpuMonitorString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.EnableCpuMonitorConfig]; - if (!string.IsNullOrEmpty(enableCpuMonitorString)) - { - bool enableCpuMonitorFlag = DefaultEnableCpuMonitor; - if (bool.TryParse(enableCpuMonitorString, out enableCpuMonitorFlag)) - { - this.enableCpuMonitor = enableCpuMonitorFlag; - } - } - } -#if NETSTANDARD20 - } + if (System.Reflection.Assembly.GetEntryAssembly() != null) + { +#endif + // For tests we want to allow stronger consistency during construction or per call + string allowOverrideStrongerConsistencyConfig = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.AllowOverrideStrongerConsistency]; + if (!string.IsNullOrEmpty(allowOverrideStrongerConsistencyConfig)) + { + if (!bool.TryParse(allowOverrideStrongerConsistencyConfig, out this.allowOverrideStrongerConsistency)) + { + this.allowOverrideStrongerConsistency = false; + } + } + + // We might want to override the defaults sometime + string maxConcurrentConnectionOpenRequestsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxConcurrentConnectionOpenConfig]; + if (!string.IsNullOrEmpty(maxConcurrentConnectionOpenRequestsOverrideString)) + { + int maxConcurrentConnectionOpenRequestOverrideInt = 0; + if (Int32.TryParse(maxConcurrentConnectionOpenRequestsOverrideString, out maxConcurrentConnectionOpenRequestOverrideInt)) + { + this.maxConcurrentConnectionOpenRequests = maxConcurrentConnectionOpenRequestOverrideInt; + } + } + + string openConnectionTimeoutInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.OpenConnectionTimeoutInSecondsConfig]; + if (!string.IsNullOrEmpty(openConnectionTimeoutInSecondsOverrideString)) + { + int openConnectionTimeoutInSecondsOverrideInt = 0; + if (Int32.TryParse(openConnectionTimeoutInSecondsOverrideString, out openConnectionTimeoutInSecondsOverrideInt)) + { + this.openConnectionTimeoutInSeconds = openConnectionTimeoutInSecondsOverrideInt; + } + } + + string idleConnectionTimeoutInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.IdleConnectionTimeoutInSecondsConfig]; + if (!string.IsNullOrEmpty(idleConnectionTimeoutInSecondsOverrideString)) + { + int idleConnectionTimeoutInSecondsOverrideInt = 0; + if (Int32.TryParse(idleConnectionTimeoutInSecondsOverrideString, out idleConnectionTimeoutInSecondsOverrideInt)) + { + this.idleConnectionTimeoutInSeconds = idleConnectionTimeoutInSecondsOverrideInt; + } + } + + string transportTimerPoolGranularityInSecondsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.TransportTimerPoolGranularityInSecondsConfig]; + if (!string.IsNullOrEmpty(transportTimerPoolGranularityInSecondsOverrideString)) + { + int timerPoolGranularityInSecondsOverrideInt = 0; + if (Int32.TryParse(transportTimerPoolGranularityInSecondsOverrideString, out timerPoolGranularityInSecondsOverrideInt)) + { + // timeoutgranularity specified should be greater than min(5 seconds) + if (timerPoolGranularityInSecondsOverrideInt > this.timerPoolGranularityInSeconds) + { + this.timerPoolGranularityInSeconds = timerPoolGranularityInSecondsOverrideInt; + } + } + } + + string enableRntbdChannelOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.EnableTcpChannelConfig]; + if (!string.IsNullOrEmpty(enableRntbdChannelOverrideString)) + { + bool enableRntbdChannel = false; + if (bool.TryParse(enableRntbdChannelOverrideString, out enableRntbdChannel)) + { + this.enableRntbdChannel = enableRntbdChannel; + } + } + + string maxRequestsPerRntbdChannelOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxRequestsPerChannelConfig]; + if (!string.IsNullOrEmpty(maxRequestsPerRntbdChannelOverrideString)) + { + int maxRequestsPerChannel = DocumentClient.DefaultMaxRequestsPerRntbdChannel; + if (int.TryParse(maxRequestsPerRntbdChannelOverrideString, out maxRequestsPerChannel)) + { + this.maxRequestsPerRntbdChannel = maxRequestsPerChannel; + } + } + + string rntbdPartitionCountOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.TcpPartitionCount]; + if (!string.IsNullOrEmpty(rntbdPartitionCountOverrideString)) + { + int rntbdPartitionCount = DocumentClient.DefaultRntbdPartitionCount; + if (int.TryParse(rntbdPartitionCountOverrideString, out rntbdPartitionCount)) + { + this.rntbdPartitionCount = rntbdPartitionCount; + } + } + + string maxRntbdChannelsOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.MaxChannelsPerHostConfig]; + if (!string.IsNullOrEmpty(maxRntbdChannelsOverrideString)) + { + int maxRntbdChannels = DefaultMaxRntbdChannelsPerHost; + if (int.TryParse(maxRntbdChannelsOverrideString, out maxRntbdChannels)) + { + this.maxRntbdChannels = maxRntbdChannels; + } + } + + string rntbdPortReuseModeOverrideString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.RntbdPortReuseMode]; + if (!string.IsNullOrEmpty(rntbdPortReuseModeOverrideString)) + { + PortReuseMode portReuseMode = DefaultRntbdPortReuseMode; + if (Enum.TryParse(rntbdPortReuseModeOverrideString, out portReuseMode)) + { + this.rntbdPortReuseMode = portReuseMode; + } + } + + string rntbdReceiveHangDetectionTimeSecondsString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.RntbdReceiveHangDetectionTimeConfig]; + if (!string.IsNullOrEmpty(rntbdReceiveHangDetectionTimeSecondsString)) + { + int rntbdReceiveHangDetectionTimeSeconds = DefaultRntbdReceiveHangDetectionTimeSeconds; + if (int.TryParse(rntbdReceiveHangDetectionTimeSecondsString, out rntbdReceiveHangDetectionTimeSeconds)) + { + this.rntbdReceiveHangDetectionTimeSeconds = rntbdReceiveHangDetectionTimeSeconds; + } + } + + string rntbdSendHangDetectionTimeSecondsString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.RntbdSendHangDetectionTimeConfig]; + if (!string.IsNullOrEmpty(rntbdSendHangDetectionTimeSecondsString)) + { + int rntbdSendHangDetectionTimeSeconds = DefaultRntbdSendHangDetectionTimeSeconds; + if (int.TryParse(rntbdSendHangDetectionTimeSecondsString, out rntbdSendHangDetectionTimeSeconds)) + { + this.rntbdSendHangDetectionTimeSeconds = rntbdSendHangDetectionTimeSeconds; + } + } + + if (enableCpuMonitor.HasValue) + { + this.enableCpuMonitor = enableCpuMonitor.Value; + } + else + { + string enableCpuMonitorString = System.Configuration.ConfigurationManager.AppSettings[DocumentClient.EnableCpuMonitorConfig]; + if (!string.IsNullOrEmpty(enableCpuMonitorString)) + { + bool enableCpuMonitorFlag = DefaultEnableCpuMonitor; + if (bool.TryParse(enableCpuMonitorString, out enableCpuMonitorFlag)) + { + this.enableCpuMonitor = enableCpuMonitorFlag; + } + } + } +#if NETSTANDARD20 + } #endif #endif @@ -964,6 +977,11 @@ internal virtual void Initialize(Uri serviceEndpoint, { this.maxRntbdChannels = this.ConnectionPolicy.MaxTcpConnectionsPerEndpoint.Value; } + + if (this.ConnectionPolicy.PortReuseMode.HasValue) + { + this.rntbdPortReuseMode = this.ConnectionPolicy.PortReuseMode.Value; + } } this.ServiceEndpoint = serviceEndpoint.OriginalString.EndsWith("/", StringComparison.Ordinal) ? serviceEndpoint : new Uri(serviceEndpoint.OriginalString + "/"); @@ -1421,62 +1439,62 @@ internal virtual async Task GetDefaultConsistencyLevelAsync() return Task.FromResult(this.desiredConsistencyLevel); } - internal async Task ProcessRequestAsync( - string verb, - DocumentServiceRequest request, - IDocumentClientRetryPolicy retryPolicyInstance, - CancellationToken cancellationToken, - string testAuthorization = null) // Only for unit-tests - { - if (request == null) - { - throw new ArgumentNullException(nameof(request)); - } - - if (verb == null) - { - throw new ArgumentNullException(nameof(verb)); - } - - string payload; - string authorization = ((IAuthorizationTokenProvider)this).GetUserAuthorizationToken( - request.ResourceAddress, - PathsHelper.GetResourcePath(request.ResourceType), - verb, - request.Headers, - AuthorizationTokenType.PrimaryMasterKey, - out payload); - - // Unit-test hook - if (testAuthorization != null) - { - payload = testAuthorization; - authorization = testAuthorization; - } - request.Headers[HttpConstants.HttpHeaders.Authorization] = authorization; - - try - { - return await this.ProcessRequestAsync(request, retryPolicyInstance, cancellationToken); - } - catch (DocumentClientException dce) - { - if (payload != null - && dce.Message != null - && dce.StatusCode.HasValue - && dce.StatusCode.Value == HttpStatusCode.Unauthorized - && dce.Message.Contains(DocumentClient.MacSignatureString) - && !dce.Message.Contains(payload)) - { - DefaultTrace.TraceError("Un-expected authorization payload mis-match. Actual {0} service expected {1}", payload, dce.Message); - } - - throw; - } - } - - internal async Task ProcessRequestAsync( - DocumentServiceRequest request, + internal async Task ProcessRequestAsync( + string verb, + DocumentServiceRequest request, + IDocumentClientRetryPolicy retryPolicyInstance, + CancellationToken cancellationToken, + string testAuthorization = null) // Only for unit-tests + { + if (request == null) + { + throw new ArgumentNullException(nameof(request)); + } + + if (verb == null) + { + throw new ArgumentNullException(nameof(verb)); + } + + string payload; + string authorization = ((IAuthorizationTokenProvider)this).GetUserAuthorizationToken( + request.ResourceAddress, + PathsHelper.GetResourcePath(request.ResourceType), + verb, + request.Headers, + AuthorizationTokenType.PrimaryMasterKey, + out payload); + + // Unit-test hook + if (testAuthorization != null) + { + payload = testAuthorization; + authorization = testAuthorization; + } + request.Headers[HttpConstants.HttpHeaders.Authorization] = authorization; + + try + { + return await this.ProcessRequestAsync(request, retryPolicyInstance, cancellationToken); + } + catch (DocumentClientException dce) + { + if (payload != null + && dce.Message != null + && dce.StatusCode.HasValue + && dce.StatusCode.Value == HttpStatusCode.Unauthorized + && dce.Message.Contains(DocumentClient.MacSignatureString) + && !dce.Message.Contains(payload)) + { + DefaultTrace.TraceError("Un-expected authorization payload mis-match. Actual {0} service expected {1}", payload, dce.Message); + } + + throw; + } + } + + internal async Task ProcessRequestAsync( + DocumentServiceRequest request, IDocumentClientRetryPolicy retryPolicyInstance, CancellationToken cancellationToken) { @@ -3118,12 +3136,12 @@ private Task> ReplaceDocumentPrivateAsync(string docu { IDocumentClientRetryPolicy retryPolicyInstance = this.ResetSessionTokenRetryPolicy.GetRequestPolicy(); return TaskHelper.InlineIfPossible(() => this.ReplaceDocumentPrivateAsync( - this.GetLinkForRouting(document), - document, - options, - retryPolicyInstance, - cancellationToken), - retryPolicyInstance, + this.GetLinkForRouting(document), + document, + options, + retryPolicyInstance, + cancellationToken), + retryPolicyInstance, cancellationToken); } @@ -3434,7 +3452,7 @@ private async Task> ReplaceOfferPrivateAsync(Offer offer AuthorizationTokenType.PrimaryMasterKey)) { return new ResourceResponse( - await this.UpdateAsync(request, retryPolicyInstance), + await this.UpdateAsync(request, retryPolicyInstance), OfferTypeResolver.ResponseOfferTypeResolver); } } @@ -4127,8 +4145,8 @@ public Task> ReadConflictAsync(string conflictLink, D private async Task> ReadConflictPrivateAsync(string conflictLink, Documents.Client.RequestOptions options, IDocumentClientRetryPolicy retryPolicyInstance) { - await this.EnsureValidClientAsync(); - + await this.EnsureValidClientAsync(); + if (string.IsNullOrEmpty(conflictLink)) { throw new ArgumentNullException("conflictLink"); @@ -4857,12 +4875,12 @@ private async Task> ReadDocumentFeedInlineAsync(st DocumentFeedResponse response = await this.CreateDocumentFeedReader(documentsLink, options).ExecuteNextAsync(cancellationToken); return new DocumentFeedResponse( - response.Cast(), - response.Count, - response.Headers, - response.UseETagAsContinuation, - response.QueryMetrics, - response.RequestStatistics, + response.Cast(), + response.Count, + response.Headers, + response.UseETagAsContinuation, + response.QueryMetrics, + response.RequestStatistics, responseLengthBytes: response.ResponseLengthBytes); } @@ -5194,10 +5212,10 @@ public Task> ExecuteStoredProcedureAsync IDocumentClientRetryPolicy retryPolicyInstance = this.ResetSessionTokenRetryPolicy.GetRequestPolicy(); return TaskHelper.InlineIfPossible( () => this.ExecuteStoredProcedurePrivateAsync( - storedProcedureLink, - options, + storedProcedureLink, + options, retryPolicyInstance, - default(CancellationToken), + default(CancellationToken), procedureParams), retryPolicyInstance); } @@ -5236,20 +5254,20 @@ public Task> ExecuteStoredProcedureAsync IDocumentClientRetryPolicy retryPolicyInstance = this.ResetSessionTokenRetryPolicy.GetRequestPolicy(); return TaskHelper.InlineIfPossible( () => this.ExecuteStoredProcedurePrivateAsync( - storedProcedureLink, - options, - retryPolicyInstance, - cancellationToken, + storedProcedureLink, + options, + retryPolicyInstance, + cancellationToken, procedureParams), - retryPolicyInstance, + retryPolicyInstance, cancellationToken); } private async Task> ExecuteStoredProcedurePrivateAsync( string storedProcedureLink, Documents.Client.RequestOptions options, - IDocumentClientRetryPolicy retryPolicyInstance, - CancellationToken cancellationToken, + IDocumentClientRetryPolicy retryPolicyInstance, + CancellationToken cancellationToken, params dynamic[] procedureParams) { await this.EnsureValidClientAsync(); @@ -5295,9 +5313,9 @@ await this.AddPartitionKeyInformationAsync( request.SerializerSettings = this.GetSerializerSettingsForRequest(options); return new StoredProcedureResponse(await this.ExecuteProcedureAsync( - request, - retryPolicyInstance, - cancellationToken), + request, + retryPolicyInstance, + cancellationToken), this.GetSerializerSettingsForRequest(options)); } } @@ -6105,32 +6123,32 @@ string IAuthorizationTokenProvider.GetUserAuthorizationToken( } } } - - Task IAuthorizationTokenProvider.AddSystemAuthorizationHeaderAsync( - DocumentServiceRequest request, - string federationId, - string verb, - string resourceId) - { + + Task IAuthorizationTokenProvider.AddSystemAuthorizationHeaderAsync( + DocumentServiceRequest request, + string federationId, + string verb, + string resourceId) + { request.Headers[HttpConstants.HttpHeaders.XDate] = DateTime.UtcNow.ToString("r", CultureInfo.InvariantCulture); - + request.Headers[HttpConstants.HttpHeaders.Authorization] = ((IAuthorizationTokenProvider)this).GetUserAuthorizationToken( resourceId ?? request.ResourceAddress, PathsHelper.GetResourcePath(request.ResourceType), verb, request.Headers, request.RequestAuthorizationTokenType, - payload: out _); - - return Task.FromResult(0); + payload: out _); + + return Task.FromResult(0); } #endregion #region Core Implementation internal Task CreateAsync( - DocumentServiceRequest request, - IDocumentClientRetryPolicy retryPolicy, + DocumentServiceRequest request, + IDocumentClientRetryPolicy retryPolicy, CancellationToken cancellationToken = default(CancellationToken)) { if (request == null) @@ -6138,7 +6156,7 @@ internal Task CreateAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); } internal Task UpdateAsync( @@ -6151,7 +6169,7 @@ internal Task UpdateAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Put, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Put, request, retryPolicy, cancellationToken); } internal Task ReadAsync( @@ -6164,7 +6182,7 @@ internal Task ReadAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Get, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Get, request, retryPolicy, cancellationToken); } internal Task ReadFeedAsync( @@ -6177,7 +6195,7 @@ internal Task ReadFeedAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Get, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Get, request, retryPolicy, cancellationToken); } internal Task DeleteAsync( @@ -6190,7 +6208,7 @@ internal Task DeleteAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Delete, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Delete, request, retryPolicy, cancellationToken); } internal Task ExecuteProcedureAsync( @@ -6203,7 +6221,7 @@ internal Task ExecuteProcedureAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); } internal Task ExecuteQueryAsync( @@ -6216,7 +6234,7 @@ internal Task ExecuteQueryAsync( throw new ArgumentNullException("request"); } - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); } internal Task UpsertAsync( @@ -6230,7 +6248,7 @@ internal Task UpsertAsync( } request.Headers[HttpConstants.HttpHeaders.IsUpsert] = bool.TrueString; - return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); + return this.ProcessRequestAsync(HttpConstants.HttpMethods.Post, request, retryPolicy, cancellationToken); } #endregion @@ -6385,8 +6403,8 @@ internal IStoreModel GetStoreProxy(DocumentServiceRequest request) { return this.StoreModel; } - } - + } + /// /// The preferred link used in replace operation in SDK. /// @@ -6446,7 +6464,8 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory receiveHangDetectionTimeSeconds: this.rntbdReceiveHangDetectionTimeSeconds, sendHangDetectionTimeSeconds: this.rntbdSendHangDetectionTimeSeconds, enableCpuMonitor: this.enableCpuMonitor, - retryWithConfiguration: this.ConnectionPolicy.RetryOptions?.GetRetryWithConfiguration()); + retryWithConfiguration: this.ConnectionPolicy.RetryOptions?.GetRetryWithConfiguration(), + rntbdPortReuseMode: (Documents.PortReuseMode)this.rntbdPortReuseMode); if (this.transportClientHandlerFactory != null) { @@ -6562,7 +6581,7 @@ internal void ValidateResource(Documents.Resource resource) } internal void ValidateResource(string resourceId) - { + { if (!string.IsNullOrEmpty(resourceId)) { int match = resourceId.IndexOfAny(new char[] { '/', '\\', '?', '#' }); diff --git a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs index eb70d570ae..59fb4d6992 100644 --- a/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Fluent/CosmosClientBuilder.cs @@ -230,6 +230,10 @@ public CosmosClientBuilder WithConnectionModeDirect() /// Together with MaxRequestsPerTcpConnection, this setting limits the number of requests that are simultaneously sent to a single Cosmos DB back-end(MaxRequestsPerTcpConnection x MaxTcpConnectionPerEndpoint). /// The default value is 65,535. Value must be greater than or equal to 16. /// + /// + /// (Direct/TCP) Controls the client port reuse policy used by the transport stack. + /// The default value is PortReuseMode.ReuseUnicastPort. + /// /// /// For more information, see Connection policy: Use direct connection mode. /// @@ -238,12 +242,14 @@ public CosmosClientBuilder WithConnectionModeDirect() internal CosmosClientBuilder WithConnectionModeDirect(TimeSpan? idleTcpConnectionTimeout = null, TimeSpan? openTcpConnectionTimeout = null, int? maxRequestsPerTcpConnection = null, - int? maxTcpConnectionsPerEndpoint = null) + int? maxTcpConnectionsPerEndpoint = null, + Cosmos.PortReuseMode? portReuseMode = null) { this.clientOptions.IdleTcpConnectionTimeout = idleTcpConnectionTimeout; this.clientOptions.OpenTcpConnectionTimeout = openTcpConnectionTimeout; this.clientOptions.MaxRequestsPerTcpConnection = maxRequestsPerTcpConnection; this.clientOptions.MaxTcpConnectionsPerEndpoint = maxTcpConnectionsPerEndpoint; + this.clientOptions.PortReuseMode = portReuseMode; this.clientOptions.ConnectionMode = ConnectionMode.Direct; this.clientOptions.ConnectionProtocol = Protocol.Tcp; diff --git a/Microsoft.Azure.Cosmos/src/PortReuseMode.cs b/Microsoft.Azure.Cosmos/src/PortReuseMode.cs new file mode 100644 index 0000000000..184e21427d --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/PortReuseMode.cs @@ -0,0 +1,28 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos +{ + /// + /// Port reuse policy options used by the transport stack + /// + public enum PortReuseMode + { + /// + /// Windows Server 2016 and newer: Uses the SO_REUSE_UNICASTPORT option if the operating system has automatic client port reuse enabled. + /// Older versions of Windows, Linux, other: Uses default socket options. + /// + /// + /// see also + /// https://docs.microsoft.com/en-us/windows/win32/winsock/sol-socket-socket-options + /// https://docs.microsoft.com/en-us/powershell/module/nettcpip/set-nettcpsetting?view=win10-ps + /// https://support.microsoft.com/en-us/help/3149157/reliability-and-scalability-improvements-in-tcp-ip-for-windows-8-1-and + /// + ReuseUnicastPort = 0, + /// + /// Windows: Tracks client ports used by the Cosmos DB client and reuses them. Ports are reused at DocumentClient scope. + /// Linux: Uses default socket options. + /// + PrivatePortPool = 1, + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs index 3400a62a6f..1a99366d34 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientOptionsUnitTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.IO; + using System.Linq; using System.Net; using System.Net.Http; using Microsoft.Azure.Cosmos.Client.Core.Tests; @@ -44,6 +45,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() TimeSpan openTcpConnectionTimeout = new TimeSpan(0, 0, 5); int maxRequestsPerTcpConnection = 30; int maxTcpConnectionsPerEndpoint = 65535; + Cosmos.PortReuseMode portReuseMode = Cosmos.PortReuseMode.PrivatePortPool; IWebProxy webProxy = new TestWebProxy(); CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder( @@ -131,7 +133,8 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() idleTcpConnectionTimeout, openTcpConnectionTimeout, maxRequestsPerTcpConnection, - maxTcpConnectionsPerEndpoint + maxTcpConnectionsPerEndpoint, + portReuseMode ); cosmosClient = cosmosClientBuilder.Build(new MockDocumentClient()); @@ -142,6 +145,7 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual(openTcpConnectionTimeout, clientOptions.OpenTcpConnectionTimeout); Assert.AreEqual(maxRequestsPerTcpConnection, clientOptions.MaxRequestsPerTcpConnection); Assert.AreEqual(maxTcpConnectionsPerEndpoint, clientOptions.MaxTcpConnectionsPerEndpoint); + Assert.AreEqual(portReuseMode, clientOptions.PortReuseMode); //Verify GetConnectionPolicy returns the correct values policy = clientOptions.GetConnectionPolicy(); @@ -149,6 +153,21 @@ public void VerifyCosmosConfigurationPropertiesGetUpdated() Assert.AreEqual(openTcpConnectionTimeout, policy.OpenTcpConnectionTimeout); Assert.AreEqual(maxRequestsPerTcpConnection, policy.MaxRequestsPerTcpConnection); Assert.AreEqual(maxTcpConnectionsPerEndpoint, policy.MaxTcpConnectionsPerEndpoint); + Assert.AreEqual(portReuseMode, policy.PortReuseMode); + } + + [TestMethod] + public void VerifyPortReuseModeIsSyncedWithDirect() + { + CollectionAssert.AreEqual( + Enum.GetNames(typeof(PortReuseMode)).OrderBy(x => x).ToArray(), + Enum.GetNames(typeof(Cosmos.PortReuseMode)).OrderBy(x => x).ToArray() + ); + + CollectionAssert.AreEqual( + Enum.GetValues(typeof(PortReuseMode)).Cast().ToArray(), + Enum.GetValues(typeof(Cosmos.PortReuseMode)).Cast().ToArray() + ); } [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json index c95212fb4d..b870165aa4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DotNetSDKAPI.json @@ -1346,6 +1346,16 @@ ], "MethodInfo": "System.Nullable`1[Microsoft.Azure.Cosmos.ConsistencyLevel] get_ConsistencyLevel()" }, + "System.Nullable`1[Microsoft.Azure.Cosmos.PortReuseMode] get_PortReuseMode()": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "System.Nullable`1[Microsoft.Azure.Cosmos.PortReuseMode] get_PortReuseMode()" + }, + "System.Nullable`1[Microsoft.Azure.Cosmos.PortReuseMode] PortReuseMode": { + "Type": "Property", + "Attributes": [], + "MethodInfo": null + }, "System.Nullable`1[System.Int32] get_MaxRequestsPerTcpConnection()": { "Type": "Method", "Attributes": [], @@ -1530,6 +1540,11 @@ "Attributes": [], "MethodInfo": "Void set_OpenTcpConnectionTimeout(System.Nullable`1[System.TimeSpan])" }, + "Void set_PortReuseMode(System.Nullable`1[Microsoft.Azure.Cosmos.PortReuseMode])": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Void set_PortReuseMode(System.Nullable`1[Microsoft.Azure.Cosmos.PortReuseMode])" + }, "Void set_RequestTimeout(System.TimeSpan)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { "Type": "Method", "Attributes": [ @@ -3737,6 +3752,27 @@ }, "NestedTypes": {} }, + "PortReuseMode": { + "Subclasses": {}, + "Members": { + "Int32 value__": { + "Type": "Field", + "Attributes": [], + "MethodInfo": null + }, + "Microsoft.Azure.Cosmos.PortReuseMode PrivatePortPool": { + "Type": "Field", + "Attributes": [], + "MethodInfo": null + }, + "Microsoft.Azure.Cosmos.PortReuseMode ReuseUnicastPort": { + "Type": "Field", + "Attributes": [], + "MethodInfo": null + } + }, + "NestedTypes": {} + }, "QueryDefinition": { "Subclasses": {}, "Members": { diff --git a/changelog.md b/changelog.md index a1615dc66a..6ff5852b60 100644 --- a/changelog.md +++ b/changelog.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- [#995](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/995) Included session token in diagnostics +- [#995](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/995) Included session token in diagnostics. +- [#1000](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1000) Add PortReuseMode to CosmosClientOptions. ### Fixed