Skip to content

Commit

Permalink
[Service Bus] Emulator connection string support (#41750)
Browse files Browse the repository at this point in the history
* [Service Bus] Emulator connection string support

The focus of these changes is to add support for the connection string
format used by the emulator currently in development.  This is intended
to support internal testing on very early alpha builds.
  • Loading branch information
jsquire committed Feb 7, 2024
1 parent 901b158 commit 7c1b0c3
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ internal class AmqpClient : TransportClient
/// <param name="host">The fully qualified host name for the Service Bus namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.</param>
/// <param name="options">A set of options to apply when configuring the client.</param>
/// <param name="useTls"><c>true</c> if the client should secure the connection using TLS; otherwise, <c>false</c>.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -92,7 +93,8 @@ internal class AmqpClient : TransportClient
internal AmqpClient(
string host,
ServiceBusTokenCredential credential,
ServiceBusClientOptions options)
ServiceBusClientOptions options,
bool useTls)
{
Argument.AssertNotNullOrEmpty(host, nameof(host));
Argument.AssertNotNull(credential, nameof(credential));
Expand All @@ -102,14 +104,15 @@ internal AmqpClient(

ServiceEndpoint = new UriBuilder
{
Scheme = options.TransportType.GetUriScheme(),
Scheme = options.TransportType.GetUriScheme(useTls),
Host = host
}.Uri;

ConnectionEndpoint = (options.CustomEndpointAddress == null) ? ServiceEndpoint : new UriBuilder
{
Scheme = ServiceEndpoint.Scheme,
Host = options.CustomEndpointAddress.Host
Host = options.CustomEndpointAddress.Host,
Port = options.CustomEndpointAddress.IsDefaultPort ? -1 : options.CustomEndpointAddress.Port
}.Uri;

Credential = credential;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ internal class AmqpConnectionScope : TransportConnectionScope
private const string WebSocketsPathSuffix = "/$servicebus/websocket/";

/// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
private const string WebSocketsUriScheme = "wss";
private const string WebSocketsSecureUriScheme = "wss";

/// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
private const string WebSocketsInsecureUriScheme = "ws";

/// <summary>The seed to use for initializing random number generated for a given thread-specific instance.</summary>
private static int s_randomSeed = Environment.TickCount;
Expand Down Expand Up @@ -461,13 +464,12 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
TimeSpan timeout)
{
var serviceHostName = serviceEndpoint.Host;
var connectionHostName = connectionEndpoint.Host;
AmqpSettings amqpSettings = CreateAmpqSettings(AmqpVersion);
AmqpConnectionSettings connectionSetings = CreateAmqpConnectionSettings(serviceHostName, scopeIdentifier, _connectionIdleTimeoutMilliseconds);

TransportSettings transportSettings = transportType.IsWebSocketTransport()
? CreateTransportSettingsForWebSockets(connectionHostName, proxy)
: CreateTransportSettingsforTcp(connectionHostName, connectionEndpoint.Port);
? CreateTransportSettingsForWebSockets(connectionEndpoint, proxy)
: CreateTransportSettingsforTcp(connectionEndpoint);

// Create and open the connection, respecting the timeout constraint
// that was received.
Expand Down Expand Up @@ -1312,49 +1314,59 @@ private static AmqpSettings CreateAmpqSettings(Version amqpVersion)
/// Creates the transport settings for use with TCP.
/// </summary>
///
/// <param name="hostName">The host name of the Service Bus service endpoint.</param>
/// <param name="port">The port to use for connecting to the endpoint.</param>
/// <param name="connectionEndpoint">The Event Hubs service endpoint to connect to.</param>
///
/// <returns>The settings to use for transport.</returns>
private static TransportSettings CreateTransportSettingsforTcp(
string hostName,
int port)
private static TransportSettings CreateTransportSettingsforTcp(Uri connectionEndpoint)
{
var useTls = ShouldUseTls(connectionEndpoint.Scheme);
var port = connectionEndpoint.Port < 0 ? (useTls ? AmqpConstants.DefaultSecurePort : AmqpConstants.DefaultPort) : connectionEndpoint.Port;

// Allow the host to control the size of the transport buffers for sending and
// receiving by setting the value to -1. This results in much improved throughput
// across platforms, as different Linux distros have different needs to
// maximize efficiency, with Window having its own needs as well.
var tcpSettings = new TcpTransportSettings
{
Host = hostName,
Port = port < 0 ? AmqpConstants.DefaultSecurePort : port,
Host = connectionEndpoint.Host,
Port = port,
ReceiveBufferSize = -1,
SendBufferSize = -1
};

return new TlsTransportSettings(tcpSettings)
// If TLS is explicitly disabled, then use the TCP settings as-is. Otherwise,
// wrap them for TLS usage.

return useTls switch
{
TargetHost = hostName,
false => tcpSettings,

_ => new TlsTransportSettings(tcpSettings)
{
TargetHost = connectionEndpoint.Host
}
};
}

/// <summary>
/// Creates the transport settings for use with web sockets.
/// </summary>
///
/// <param name="hostName">The host name of the Service Bus service endpoint.</param>
/// <param name="connectionEndpoint">The Event Hubs service endpoint to connect to.</param>
/// <param name="proxy">The proxy to use for connecting to the endpoint.</param>
///
/// <returns>The settings to use for transport.</returns>
private static TransportSettings CreateTransportSettingsForWebSockets(
string hostName,
Uri connectionEndpoint,
IWebProxy proxy)
{
var uriBuilder = new UriBuilder(hostName)
var useTls = ShouldUseTls(connectionEndpoint.Scheme);

var uriBuilder = new UriBuilder(connectionEndpoint.Host)
{
Path = WebSocketsPathSuffix,
Scheme = WebSocketsUriScheme,
Port = -1
Scheme = useTls ? WebSocketsSecureUriScheme : WebSocketsInsecureUriScheme,
Port = connectionEndpoint.Port < 0 ? -1 : connectionEndpoint.Port
};

return new WebSocketTransportSettings
Expand Down Expand Up @@ -1394,6 +1406,22 @@ private static AmqpConnectionSettings CreateAmqpConnectionSettings(
return connectionSettings;
}

/// <summary>
/// Determines if the specified URL scheme should use TLS when creating an AMQP connection.
/// </summary>
///
/// <param name="urlScheme">The URL scheme to consider.</param>
///
/// <returns><c>true</c> if the connection should use TLS; otherwise, <c>false</c>.</returns>
///
private static bool ShouldUseTls(string urlScheme) => urlScheme switch
{
"ws" => false,
"amqp" => false,
"http" => false,
_ => true
};

/// <summary>
/// Validates the transport associated with the scope, throwing an argument exception
/// if it is unknown in this context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,32 @@ namespace Azure.Messaging.ServiceBus.Core
///
internal static class TransportTypeExtensions
{
/// <summary>The URI scheme used for an AMQP-based connection.</summary>
private const string AmqpUriScheme = "amqps";
/// <summary>The URI scheme used for a TLS-secured AMQP-based connection.</summary>
private const string AmqpTlsUriScheme = "amqps";

/// <summary>The URI scheme used for an insecure AMQP-based connection.</summary>
private const string AmqpInsecureUriScheme = "amqp";

/// <summary>
/// Determines the URI scheme to be used for the given connection type.
/// </summary>
///
/// <param name="instance">The instance that this method was invoked on.</param>
/// <param name="useTls"><c>true</c> if the scheme should be for a TLS-secured connection; otherwise, <c>false</c>.</param>
///
/// <returns>The scheme that should be used for the given connection type when forming an associated URI.</returns>
///
public static string GetUriScheme(this ServiceBusTransportType instance)
public static string GetUriScheme(this ServiceBusTransportType instance, bool useTls = true)
{
switch (instance)
{
case ServiceBusTransportType.AmqpTcp when useTls:
case ServiceBusTransportType.AmqpWebSockets when useTls:
return AmqpTlsUriScheme;

case ServiceBusTransportType.AmqpTcp:
case ServiceBusTransportType.AmqpWebSockets:
return AmqpUriScheme;
return AmqpInsecureUriScheme;

default:
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidTransportType, instance.ToString(), nameof(instance)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ internal ServiceBusConnection(
var connectionStringProperties = ServiceBusConnectionStringProperties.Parse(connectionString);
ValidateConnectionStringProperties(connectionStringProperties, nameof(connectionString));

// If the emulator is in use, then unset TLS and set the endpoint as a custom endpoint
// address, unless one was explicitly provided.

var useTls = true;

if (connectionStringProperties.UseDevelopmentEmulator)
{
useTls = false;
options.CustomEndpointAddress ??= connectionStringProperties.Endpoint;
}

FullyQualifiedNamespace = connectionStringProperties.Endpoint.Host;
TransportType = options.TransportType;
EntityPath = connectionStringProperties.EntityPath;
Expand All @@ -108,7 +119,7 @@ internal ServiceBusConnection(
var sharedCredential = new SharedAccessCredential(sharedAccessSignature);
var tokenCredential = new ServiceBusTokenCredential(sharedCredential);
#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
InnerClient = CreateTransportClient(tokenCredential, options);
InnerClient = CreateTransportClient(tokenCredential, options, useTls);
#pragma warning restore CA2214 // Do not call overridable methods in constructors
}

Expand Down Expand Up @@ -171,7 +182,7 @@ internal ServiceBusConnection(
RetryOptions = options.RetryOptions;

#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
InnerClient = CreateTransportClient(tokenCredential, options);
InnerClient = CreateTransportClient(tokenCredential, options, useTls: true);
#pragma warning restore CA2214 // Do not call overridable methods in constructors
}

Expand Down Expand Up @@ -263,7 +274,8 @@ internal virtual TransportRuleManager CreateTransportRuleManager(
/// </summary>
///
/// <param name="credential">The Azure managed identity credential to use for authorization.</param>
/// <param name="options"></param>
/// <param name="options">The set of options to use for the client.</param>
/// <param name="useTls"><c>true</c> if the client should secure the connection using TLS; otherwise, <c>false</c>.</param>
///
/// <returns>A client generalization specific to the specified protocol/transport to which operations may be delegated.</returns>
///
Expand All @@ -277,13 +289,14 @@ internal virtual TransportRuleManager CreateTransportRuleManager(
///
internal virtual TransportClient CreateTransportClient(
ServiceBusTokenCredential credential,
ServiceBusClientOptions options)
ServiceBusClientOptions options,
bool useTls = true)
{
switch (TransportType)
{
case ServiceBusTransportType.AmqpTcp:
case ServiceBusTransportType.AmqpWebSockets:
return new AmqpClient(FullyQualifiedNamespace, credential, options);
return new AmqpClient(FullyQualifiedNamespace, credential, options, useTls);

default:
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.InvalidTransportType, options.TransportType.ToString()), nameof(options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class ServiceBusConnectionStringProperties
/// <summary>The token that identifies the value of a shared access signature.</summary>
private const string SharedAccessSignatureToken = "SharedAccessSignature";

/// <summary>The token that identifies the intent to use a local emulator for development.</summary>
private const string DevelopmentEmulatorToken = "UseDevelopmentEmulator";

/// <summary>The formatted protocol used by an Service Bus endpoint. </summary>
private static readonly string ServiceBusEndpointScheme = $"{ ServiceBusEndpointSchemeName }{ Uri.SchemeDelimiter }";

Expand Down Expand Up @@ -85,6 +88,15 @@ public class ServiceBusConnectionStringProperties
///
public string SharedAccessSignature { get; internal set; }

/// <summary>
/// Indicates whether or not the connection string indicates that the
/// local development emulator is being used.
/// </summary>
///
/// <value><c>true</c> if the emulator is being used; otherwise, <c>false</c>.</value>
///
internal bool UseDevelopmentEmulator { get; set; }

/// <summary>
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
/// </summary>
Expand Down Expand Up @@ -202,6 +214,15 @@ internal string ToConnectionString()
.Append(TokenValuePairDelimiter);
}

if (UseDevelopmentEmulator)
{
builder
.Append(DevelopmentEmulatorToken)
.Append(TokenValueSeparator)
.Append("true")
.Append(TokenValuePairDelimiter);
}

return builder.ToString();
}

Expand Down Expand Up @@ -278,10 +299,29 @@ public static ServiceBusConnectionStringProperties Parse(string connectionString

if (string.Compare(EndpointToken, token, StringComparison.OrdinalIgnoreCase) == 0)
{
var endpointBuilder = new UriBuilder(value)
// If this is an absolute URI, then it may have a custom port specified, which we
// want to preserve. If no scheme was specified, the URI is considered relative and
// the default port should be used.

if (!Uri.TryCreate(value, UriKind.Absolute, out var endpointUri))
{
Scheme = ServiceBusEndpointScheme,
Port = -1
endpointUri = null;
}

var endpointBuilder = endpointUri switch
{
null => new UriBuilder(value)
{
Scheme = ServiceBusEndpointSchemeName,
Port = -1
},

_ => new UriBuilder()
{
Scheme = ServiceBusEndpointSchemeName,
Host = endpointUri.Host,
Port = endpointUri.IsDefaultPort ? -1 : endpointUri.Port,
}
};

if ((string.Compare(endpointBuilder.Scheme, ServiceBusEndpointSchemeName, StringComparison.OrdinalIgnoreCase) != 0)
Expand All @@ -308,6 +348,16 @@ public static ServiceBusConnectionStringProperties Parse(string connectionString
{
parsedValues.SharedAccessSignature = value;
}
else if (string.Compare(DevelopmentEmulatorToken, token, StringComparison.OrdinalIgnoreCase) == 0)
{
// Do not enforce a value for the development emulator token. If a valid boolean, use it.
// Otherwise, leave the default value of false.

if (bool.TryParse(value, out var useEmulator))
{
parsedValues.UseDevelopmentEmulator = useEmulator;
}
}
}
else if ((slice.Length != 1) || (slice[0] != TokenValuePairDelimiter))
{
Expand All @@ -321,6 +371,13 @@ public static ServiceBusConnectionStringProperties Parse(string connectionString
lastPosition = currentPosition;
}

// Enforce that the development emulator can only be used for local development.

if ((parsedValues.UseDevelopmentEmulator) && (!parsedValues.Endpoint.IsLoopback))
{
throw new ArgumentException("The Service Bus emulator is only available locally. The endpoint must reference to the local host.", connectionString);
}

return parsedValues;
}
}
Expand Down
Loading

0 comments on commit 7c1b0c3

Please sign in to comment.