Skip to content

Commit

Permalink
Migrate SB extensions to new SDK (Azure#18330)
Browse files Browse the repository at this point in the history
* Migrate SB extensions to new SDK

* Fixes

* API

* Remove ExceptionHandler API

* PR fb

* Fix test

* add back cleanupentity
  • Loading branch information
JoshLove-msft authored and jongio committed Feb 9, 2021
1 parent 5536e95 commit 9392017
Show file tree
Hide file tree
Showing 61 changed files with 1,202 additions and 987 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private JObject ConstructConnectionOptions() =>
new JObject
{
{ nameof(EventHubConnectionOptions.TransportType), ConnectionOptions.TransportType.ToString() },
{ nameof(EventHubConnectionOptions.Proxy), ConnectionOptions.Proxy?.ToString()},
{ nameof(EventHubConnectionOptions.Proxy), ConnectionOptions.Proxy?.ToString() ?? string.Empty},
};

private JObject ConstructRetryOptions() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ public ServiceBusTriggerAttribute(string topicName, string subscriptionName) { }
}
namespace Microsoft.Azure.WebJobs.ServiceBus
{
public partial class BatchOptions
{
public BatchOptions() { }
public bool AutoComplete { get { throw null; } set { } }
public int MaxMessageCount { get { throw null; } set { } }
public System.TimeSpan OperationTimeout { get { throw null; } set { } }
}
public static partial class Constants
{
public const string AzureWebsiteSku = "WEBSITE_SKU";
Expand All @@ -58,30 +51,35 @@ public enum EntityType
}
public partial class MessageProcessor
{
public MessageProcessor(Microsoft.Azure.ServiceBus.Core.MessageReceiver messageReceiver, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageOptions) { }
public Microsoft.Azure.ServiceBus.MessageHandlerOptions MessageOptions { get { throw null; } }
protected Microsoft.Azure.ServiceBus.Core.MessageReceiver MessageReceiver { get { throw null; } set { } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class MessagingProvider
{
public MessagingProvider(Microsoft.Extensions.Options.IOptions<Microsoft.Azure.WebJobs.ServiceBus.ServiceBusOptions> serviceBusOptions) { }
public virtual Microsoft.Azure.ServiceBus.ClientEntity CreateClientEntity(string entityPath, string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(string entityPath, string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(string entityPath, string connectionString) { throw null; }
public virtual Microsoft.Azure.ServiceBus.Core.MessageReceiver CreateMessageReceiver(string entityPath, string connectionString) { throw null; }
public virtual Microsoft.Azure.ServiceBus.Core.MessageSender CreateMessageSender(string entityPath, string connectionString) { throw null; }
public virtual Microsoft.Azure.ServiceBus.SessionClient CreateSessionClient(string entityPath, string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(string entityPath, string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(string entityPath, string connectionString) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(string entityPath, string connectionString) { throw null; }
}
public partial class ServiceBusOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
{
public ServiceBusOptions() { }
public Microsoft.Azure.WebJobs.ServiceBus.BatchOptions BatchOptions { get { throw null; } set { } }
public bool AutoCompleteMessages { get { throw null; } set { } }
public string ConnectionString { get { throw null; } set { } }
public Microsoft.Azure.ServiceBus.MessageHandlerOptions MessageHandlerOptions { get { throw null; } set { } }
public System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ExceptionHandler { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public int MaxConcurrentCalls { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int MaxMessages { get { throw null; } set { } }
public System.TimeSpan? MaxWaitTime { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Microsoft.Azure.ServiceBus.SessionHandlerOptions SessionHandlerOptions { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusRetryOptions RetryOptions { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
public System.Net.IWebProxy WebProxy { get { throw null; } set { } }
public string Format() { throw null; }
}
public partial class ServiceBusWebJobsStartup : Microsoft.Azure.WebJobs.Hosting.IWebJobsStartup
Expand All @@ -91,11 +89,9 @@ public void Configure(Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { }
}
public partial class SessionMessageProcessor
{
public SessionMessageProcessor(Microsoft.Azure.ServiceBus.ClientEntity clientEntity, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { }
protected Microsoft.Azure.ServiceBus.ClientEntity ClientEntity { get { throw null; } set { } }
public Microsoft.Azure.ServiceBus.SessionHandlerOptions SessionHandlerOptions { get { throw null; } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.ServiceBus.IMessageSession session, Microsoft.Azure.ServiceBus.Message message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.ServiceBus.IMessageSession session, Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
public SessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusSessionProcessor processor) { }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusSessionReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Azure.Messaging.ServiceBus.ServiceBusSessionReceiver receiver, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
}
namespace Microsoft.Extensions.Hosting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Diagnostics;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Converters;

Expand Down Expand Up @@ -53,9 +53,9 @@ private static IArgumentBinding<ServiceBusEntity> CreateBindingGeneric<TItem>()

private class AsyncCollectorArgumentBinding<TItem> : IArgumentBinding<ServiceBusEntity>
{
private readonly IConverter<TItem, Message> _converter;
private readonly IConverter<TItem, ServiceBusMessage> _converter;

public AsyncCollectorArgumentBinding(IConverter<TItem, Message> converter)
public AsyncCollectorArgumentBinding(IConverter<TItem, ServiceBusMessage> converter)
{
_converter = converter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Task<IValueProvider> BindAsync(ServiceBusEntity value, ValueBindingContex
}

IValueProvider provider = new NonNullConverterValueBinder<byte[]>(value,
new ByteArrayToBrokeredMessageConverter(), context.FunctionInstanceId);
new ByteArrayToMessageConverter(), context.FunctionInstanceId);

return Task.FromResult(provider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@

using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
{
internal class ByteArrayToBrokeredMessageConverter : IConverter<byte[], Message>
internal class ByteArrayToMessageConverter : IConverter<byte[], ServiceBusMessage>
{
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public Message Convert(byte[] input)
public ServiceBusMessage Convert(byte[] input)
{
if (input == null)
{
throw new InvalidOperationException("A brokered message cannot contain a null byte array instance.");
}

return new Message(input)
return new ServiceBusMessage(input)
{
ContentType = ContentTypes.ApplicationOctetStream
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
using System.Diagnostics;
using System.Reflection;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Converters;
using Microsoft.Azure.ServiceBus;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
{
Expand Down Expand Up @@ -53,9 +53,9 @@ private static IArgumentBinding<ServiceBusEntity> CreateBindingGeneric<TItem>()

private class CollectorArgumentBinding<TItem> : IArgumentBinding<ServiceBusEntity>
{
private readonly IConverter<TItem, Message> _converter;
private readonly IConverter<TItem, ServiceBusMessage> _converter;

public CollectorArgumentBinding(IConverter<TItem, Message> converter)
public CollectorArgumentBinding(IConverter<TItem, ServiceBusMessage> converter)
{
_converter = converter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Task<object> GetValueAsync()

public string ToInvokeString()
{
return _entity.MessageSender.Path;
return _entity.MessageSender.EntityPath;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;
using System.Diagnostics;
using Azure.Messaging.ServiceBus;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
{
internal class ConverterValueBinder<TInput> : IOrderedValueBinder
{
private readonly ServiceBusEntity _entity;
private readonly IConverter<TInput, Message> _converter;
private readonly IConverter<TInput, ServiceBusMessage> _converter;
private readonly Guid _functionInstanceId;

public ConverterValueBinder(ServiceBusEntity entity, IConverter<TInput, Message> converter,
public ConverterValueBinder(ServiceBusEntity entity, IConverter<TInput, ServiceBusMessage> converter,
Guid functionInstanceId)
{
_entity = entity;
Expand All @@ -41,12 +41,12 @@ public Task<object> GetValueAsync()

public string ToInvokeString()
{
return _entity.MessageSender.Path;
return _entity.MessageSender.EntityPath;
}

public Task SetValueAsync(object value, CancellationToken cancellationToken)
{
Message message = _converter.Convert((TInput)value);
ServiceBusMessage message = _converter.Convert((TInput)value);
Debug.Assert(message != null);
return _entity.SendAndCreateEntityIfNotExistsAsync(message, _functionInstanceId, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
Expand All @@ -13,7 +13,7 @@ internal class MessageArgumentBinding : IArgumentBinding<ServiceBusEntity>
{
public Type ValueType
{
get { return typeof(Message); }
get { return typeof(ServiceBusMessage); }
}

public Task<IValueProvider> BindAsync(ServiceBusEntity value, ValueBindingContext context)
Expand Down Expand Up @@ -46,7 +46,7 @@ public BindStepOrder StepOrder

public Type Type
{
get { return typeof(Message); }
get { return typeof(ServiceBusMessage); }
}

public Task<object> GetValueAsync()
Expand All @@ -56,7 +56,7 @@ public Task<object> GetValueAsync()

public string ToInvokeString()
{
return _entity.MessageSender.Path;
return _entity.MessageSender.EntityPath;
}

/// <summary>
Expand Down Expand Up @@ -91,7 +91,7 @@ public async Task SetValueAsync(object value, CancellationToken cancellationToke
return;
}

var message = (Message)value;
var message = (ServiceBusMessage)value;

await _entity.SendAndCreateEntityIfNotExistsAsync(message, _functionInstanceId, cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Reflection;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
Expand All @@ -11,7 +11,7 @@ internal class MessageArgumentBindingProvider : IQueueArgumentBindingProvider
{
public IArgumentBinding<ServiceBusEntity> TryCreate(ParameterInfo parameter)
{
if (!parameter.IsOut || parameter.ParameterType != typeof(Message).MakeByRefType())
if (!parameter.IsOut || parameter.ParameterType != typeof(ServiceBusMessage).MakeByRefType())
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@
using System;
using System.Collections;
using Microsoft.Azure.WebJobs.Host.Converters;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
{
internal static class MessageConverterFactory
{
internal static IConverter<TInput, Message> Create<TInput>()
internal static IConverter<TInput, ServiceBusMessage> Create<TInput>()
{
if (typeof(TInput) == typeof(Message))
if (typeof(TInput) == typeof(ServiceBusMessage))
{
return (IConverter<TInput, Message>)new IdentityConverter<TInput>();
return (IConverter<TInput, ServiceBusMessage>)new IdentityConverter<TInput>();
}
else if (typeof(TInput) == typeof(string))
{
return (IConverter<TInput, Message>)new StringToBrokeredMessageConverter();
return (IConverter<TInput, ServiceBusMessage>)new StringToBrokeredMessageConverter();
}
else if (typeof(TInput) == typeof(byte[]))
{
return (IConverter<TInput, Message>)new ByteArrayToBrokeredMessageConverter();
return (IConverter<TInput, ServiceBusMessage>)new ByteArrayToMessageConverter();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Core;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs.Host.Bindings;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
Expand All @@ -14,7 +14,7 @@ internal class MessageSenderArgumentBindingProvider : IQueueArgumentBindingProvi
{
public IArgumentBinding<ServiceBusEntity> TryCreate(ParameterInfo parameter)
{
if (parameter.ParameterType != typeof(MessageSender))
if (parameter.ParameterType != typeof(ServiceBusSender))
{
return null;
}
Expand All @@ -26,7 +26,7 @@ internal class MessageSenderArgumentBinding : IArgumentBinding<ServiceBusEntity>
{
public Type ValueType
{
get { return typeof(MessageSender); }
get { return typeof(ServiceBusSender); }
}

public Task<IValueProvider> BindAsync(ServiceBusEntity value, ValueBindingContext context)
Expand All @@ -43,9 +43,9 @@ public Task<IValueProvider> BindAsync(ServiceBusEntity value, ValueBindingContex

private class MessageSenderValueBinder : IValueBinder
{
private readonly MessageSender _messageSender;
private readonly ServiceBusSender _messageSender;

public MessageSenderValueBinder(MessageSender messageSender)
public MessageSenderValueBinder(ServiceBusSender messageSender)
{
_messageSender = messageSender;
}
Expand All @@ -57,7 +57,7 @@ public static BindStepOrder StepOrder

public Type Type
{
get { return typeof(MessageSender); }
get { return typeof(ServiceBusSender); }
}

public Task<object> GetValueAsync()
Expand All @@ -67,7 +67,7 @@ public Task<object> GetValueAsync()

public string ToInvokeString()
{
return _messageSender.Path;
return _messageSender.EntityPath;
}

public Task SetValueAsync(object value, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Azure.Messaging.ServiceBus;

namespace Microsoft.Azure.WebJobs.ServiceBus.Bindings
{
internal class MessageSenderAsyncCollector<T> : IAsyncCollector<T>
{
private readonly ServiceBusEntity _entity;
private readonly IConverter<T, Message> _converter;
private readonly IConverter<T, ServiceBusMessage> _converter;
private readonly Guid _functionInstanceId;

public MessageSenderAsyncCollector(ServiceBusEntity entity, IConverter<T, Message> converter,
public MessageSenderAsyncCollector(ServiceBusEntity entity, IConverter<T, ServiceBusMessage> converter,
Guid functionInstanceId)
{
if (entity == null)
Expand All @@ -34,7 +34,7 @@ public MessageSenderAsyncCollector(ServiceBusEntity entity, IConverter<T, Messag

public Task AddAsync(T item, CancellationToken cancellationToken)
{
Message message = _converter.Convert(item);
ServiceBusMessage message = _converter.Convert(item);

if (message == null)
{
Expand Down
Loading

0 comments on commit 9392017

Please sign in to comment.