diff --git a/.github/workflows/publish-nuget-packages.yaml b/.github/workflows/publish-nuget-packages.yaml index 3aa5a45..5dc1f34 100644 --- a/.github/workflows/publish-nuget-packages.yaml +++ b/.github/workflows/publish-nuget-packages.yaml @@ -11,6 +11,6 @@ jobs: - uses: actions/checkout@v1 - uses: actions/setup-dotnet@v1 with: - dotnet-version: '3.0.100' + dotnet-version: '3.1.101' - run: dotnet pack prometheus-net.Contrib.sln --include-symbols -c "Release" --output "build/" - run: dotnet nuget push "build/*.symbols.nupkg" -k ${{ secrets.NUGET_API_KEY }} -s "https://api.nuget.org/v3/index.json" -n true diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 96a37ec..2bf2e22 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -14,7 +14,6 @@ jobs: - uses: actions/checkout@v1 - uses: actions/setup-dotnet@v1 with: - dotnet-version: 3.0.100 + dotnet-version: '3.1.101' - name: dotnet test run: dotnet test -c "Release" - diff --git a/Directory.Build.props b/Directory.Build.props index 19ee783..150e7af 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -8,5 +8,7 @@ alexvaluyskiy true 0.7.0 + prometheus metrics + Exposes .NET core diagnostic listeners and counters diff --git a/prometheus-net.Contrib.sln b/prometheus-net.Contrib.sln index 40e9786..65657b6 100644 --- a/prometheus-net.Contrib.sln +++ b/prometheus-net.Contrib.sln @@ -23,7 +23,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "prometheus-net.Esquio", "sr EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{AC951721-6E15-4E0A-A15A-E268AA6DDAEA}" ProjectSection(SolutionItems) = preProject + Directory.Build.props = Directory.Build.props + .github\workflows\publish-nuget-packages.yaml = .github\workflows\publish-nuget-packages.yaml README.md = README.md + .github\workflows\run-tests.yaml = .github\workflows\run-tests.yaml EndProjectSection EndProject Global diff --git a/samples/WebApp/Controllers/TestController.cs b/samples/WebApp/Controllers/TestController.cs index 04d4ce6..b8a9b2c 100644 --- a/samples/WebApp/Controllers/TestController.cs +++ b/samples/WebApp/Controllers/TestController.cs @@ -2,8 +2,9 @@ using System.Net.Http; using System.Threading.Tasks; using MassTransit; +using MassTransit.Courier; using Microsoft.AspNetCore.Mvc; -using WebApp.Consumers; +using WebApp.MassTransit; namespace WebApp.Controllers { @@ -11,17 +12,17 @@ namespace WebApp.Controllers [Route("[controller]")] public class TestController : ControllerBase { - private readonly IPublishEndpoint endpoint; + private readonly IBus bus; - public TestController(IPublishEndpoint endpoint) + public TestController(IBus bus) { - this.endpoint = endpoint; + this.bus = bus; } [HttpGet("send")] public async Task TestSend() { - await endpoint.Publish(new TestCommand + await bus.Publish(new TestCommand { Id = Guid.NewGuid() }); @@ -31,5 +32,36 @@ await endpoint.Publish(new TestCommand return Ok(); } + + [HttpGet("saga")] + public async Task TestSaga() + { + var orderId = Guid.NewGuid(); + + await bus.Publish(new + { + OrderId = orderId + }); + + await bus.Publish(new + { + OrderId = orderId + }); + + return Ok(); + } + + [HttpGet("courier")] + public async Task TestCourier() + { + var builder = new RoutingSlipBuilder(NewId.NextGuid()); + builder.AddActivity("DownloadImage", new Uri("queue:test_courier_execute")); + builder.AddActivity("FilterImage", new Uri("queue:test_courier_execute")); + var routingSlip = builder.Build(); + + await bus.Execute(routingSlip); + + return Ok(); + } } } diff --git a/samples/WebApp/MassTransit/OrderStateMachine.cs b/samples/WebApp/MassTransit/OrderStateMachine.cs new file mode 100644 index 0000000..a391120 --- /dev/null +++ b/samples/WebApp/MassTransit/OrderStateMachine.cs @@ -0,0 +1,52 @@ +using System; +using Automatonymous; + +namespace WebApp.MassTransit +{ + public class OrderStateMachine : MassTransitStateMachine + { + public OrderStateMachine() + { + InstanceState(x => x.CurrentState); + + Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId)); + Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId)); + + Initially( + When(SubmitOrder) + .TransitionTo(Submitted)); + + During(Submitted, + When(OrderAccepted) + .Then(context => + { + + }) + .TransitionTo(Accepted) + .Finalize()); + } + + public Event SubmitOrder { get; private set; } + public Event OrderAccepted { get; private set; } + + + public State Submitted { get; private set; } + public State Accepted { get; private set; } + } + + public class OrderState : SagaStateMachineInstance + { + public Guid CorrelationId { get; set; } + public string CurrentState { get; set; } + } + + public interface SubmitOrder + { + Guid OrderId { get; } + } + + public interface OrderAccepted + { + Guid OrderId { get; } + } +} diff --git a/samples/WebApp/Consumers/TestCommand.cs b/samples/WebApp/MassTransit/TestCommand.cs similarity index 77% rename from samples/WebApp/Consumers/TestCommand.cs rename to samples/WebApp/MassTransit/TestCommand.cs index 7f651a6..8bde987 100644 --- a/samples/WebApp/Consumers/TestCommand.cs +++ b/samples/WebApp/MassTransit/TestCommand.cs @@ -1,6 +1,6 @@ using System; -namespace WebApp.Consumers +namespace WebApp.MassTransit { public class TestCommand { diff --git a/samples/WebApp/Consumers/TestConsumer.cs b/samples/WebApp/MassTransit/TestConsumer.cs similarity index 76% rename from samples/WebApp/Consumers/TestConsumer.cs rename to samples/WebApp/MassTransit/TestConsumer.cs index c35fdfa..f5e68a2 100644 --- a/samples/WebApp/Consumers/TestConsumer.cs +++ b/samples/WebApp/MassTransit/TestConsumer.cs @@ -1,14 +1,11 @@ -using MassTransit; -using StackExchange.Redis; -using System; -using Microsoft.Data.SqlClient; +using System; using System.Threading.Tasks; -using WebApp; -using WebApp.Data; +using MassTransit; +using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; -using System.Transactions; +using WebApp.Data; -namespace WebApp.Consumers +namespace WebApp.MassTransit { public class TestConsumer : IConsumer { @@ -23,9 +20,6 @@ public TestConsumer(SqlConnection connection, TestContext testContext) public async Task Consume(ConsumeContext context) { - //var database = Startup.connection.GetDatabase(); - //database.StringGet("test1"); - await testContext.TestEntities.ToListAsync(); var command = connection.CreateCommand(); diff --git a/samples/WebApp/MassTransit/TestCourier.cs b/samples/WebApp/MassTransit/TestCourier.cs new file mode 100644 index 0000000..1d874c7 --- /dev/null +++ b/samples/WebApp/MassTransit/TestCourier.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading.Tasks; +using MassTransit.Courier; + +namespace WebApp.MassTransit +{ + public class DownloadImageActivity : IActivity + { + public async Task Execute(ExecuteContext context) + { + return default; + } + + public async Task Compensate(CompensateContext context) + { + return default; + } + } + + public class FilterImageActivity : IActivity + { + public async Task Execute(ExecuteContext context) + { + return default; + } + + public async Task Compensate(CompensateContext context) + { + return default; + } + } + + public class FilterImageLog + { + } + + public class FilterImageArguments + { + } + + public class DownloadImageLog + { + } + + public class DownloadImageArguments + { + } +} diff --git a/samples/WebApp/Startup.cs b/samples/WebApp/Startup.cs index b33df1e..be6ece5 100644 --- a/samples/WebApp/Startup.cs +++ b/samples/WebApp/Startup.cs @@ -4,24 +4,21 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using MassTransit.AspNetCoreIntegration; -using StackExchange.Redis; using Microsoft.Data.SqlClient; using Prometheus; using MassTransit; -using WebApp.Consumers; -using MassTransit.ActiveMqTransport; -using MassTransit.ActiveMqTransport.Configurators; using System; +using GreenPipes; +using MassTransit.Saga; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Diagnostics.HealthChecks; using WebApp.Data; -using Microsoft.AspNetCore.Routing; +using WebApp.MassTransit; namespace WebApp { public class Startup { - public static ConnectionMultiplexer connection = ConnectionMultiplexer.Connect(""); - public Startup(IConfiguration configuration) { Configuration = configuration; @@ -38,21 +35,46 @@ public void ConfigureServices(IServiceCollection services) options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); }); + services.AddSingleton>(provider => new InMemorySagaRepository()); + services.AddMassTransit( - provider => Bus.Factory.CreateUsingActiveMq(factoryConfigurator => + provider => Bus.Factory.CreateUsingRabbitMq(factoryConfigurator => { - var host = factoryConfigurator.Host(new ConfigurationHostSettings(new Uri("activemq://localhost:61616"))); + factoryConfigurator.Host(new Uri("amqp://admin:admin@localhost:5672/")); + factoryConfigurator.ReceiveEndpoint("test_events", receiveEndpointConfigurator => { receiveEndpointConfigurator.Consumer(provider); }); + + factoryConfigurator.ReceiveEndpoint("test_saga", receiveEndpointConfigurator => + { + receiveEndpointConfigurator.StateMachineSaga(provider); + }); + + factoryConfigurator.ReceiveEndpoint("test_courier_execute", receiveEndpointConfigurator => + { + var compnsateUri = new Uri("queue:test_courier_compensate"); + receiveEndpointConfigurator.ExecuteActivityHost(compnsateUri, provider); + receiveEndpointConfigurator.ExecuteActivityHost(compnsateUri, provider); + }); + + factoryConfigurator.ReceiveEndpoint("test_courier_compensate", receiveEndpointConfigurator => + { + receiveEndpointConfigurator.CompensateActivityHost(provider); + }); }), config => { config.AddConsumer(); - }); + config.AddSagaStateMachine(); + + config.AddActivity(); + config.AddActivity(); + }, options => options.FailureStatus = HealthStatus.Unhealthy); services.AddPrometheusAspNetCoreMetrics(); + services.AddPrometheusMassTransitMetrics(); services.AddSingleton(provider => { diff --git a/samples/WebApp/WebApp.csproj b/samples/WebApp/WebApp.csproj index 0002cb3..d80ff9c 100644 --- a/samples/WebApp/WebApp.csproj +++ b/samples/WebApp/WebApp.csproj @@ -1,18 +1,20 @@  - netcoreapp3.0 + netcoreapp3.1 - - + + - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + - diff --git a/samples/WebApp/docker-compose.yml b/samples/WebApp/docker-compose.yml index addf81b..bfcd0f5 100644 --- a/samples/WebApp/docker-compose.yml +++ b/samples/WebApp/docker-compose.yml @@ -9,19 +9,13 @@ services: ports: - 11433:1433 - redis: - image: redis:5 - command: ["redis-server", "--appendonly", "yes"] - ports: - - 6379:6379 - - activemq: - image: rmohr/activemq:latest + rabbit: + image: "rabbitmq:3.8.2-management" environment: - - "ACTIVEMQ_ADMIN_LOGIN=admin" - - "ACTIVEMQ_ADMIN_PASSWORD=admin" - - "ACTIVEMQ_LOGGER_LOGLEVEL=TRACE" + RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG" + RABBITMQ_DEFAULT_USER: "admin" + RABBITMQ_DEFAULT_PASS: "admin" + RABBITMQ_DEFAULT_VHOST: "/" ports: - - 8161:8161 - - 61616:61616 - - 61613:61613 \ No newline at end of file + - "15672:15672" + - "5672:5672" \ No newline at end of file diff --git a/src/prometheus-net.Contrib/Core/DiagnosticSourceListener.cs b/src/prometheus-net.Contrib/Core/DiagnosticSourceListener.cs index 3589f82..d63e518 100644 --- a/src/prometheus-net.Contrib/Core/DiagnosticSourceListener.cs +++ b/src/prometheus-net.Contrib/Core/DiagnosticSourceListener.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Text; namespace Prometheus.Contrib.Core { @@ -24,9 +23,6 @@ public void OnError(Exception error) public void OnNext(KeyValuePair value) { - //if (Activity.Current == null) - // return; - try { if (value.Key.EndsWith("Start")) @@ -38,7 +34,7 @@ public void OnNext(KeyValuePair value) else handler.OnCustom(value.Key, Activity.Current, value.Value); } - catch (Exception ex) + catch { } } diff --git a/src/prometheus-net.Contrib/Core/DiagnosticSourceSubscriber.cs b/src/prometheus-net.Contrib/Core/DiagnosticSourceSubscriber.cs index 3d30b85..b1de0d5 100644 --- a/src/prometheus-net.Contrib/Core/DiagnosticSourceSubscriber.cs +++ b/src/prometheus-net.Contrib/Core/DiagnosticSourceSubscriber.cs @@ -9,10 +9,9 @@ public class DiagnosticSourceSubscriber : IDisposable, IObserver handlerFactory; private readonly Func diagnosticSourceFilter; - //private readonly Func isEnabledFilter; private long disposed; private IDisposable allSourcesSubscription; - private List listenerSubscriptions; + private readonly List listenerSubscriptions; public DiagnosticSourceSubscriber( Func handlerFactory, diff --git a/src/prometheus-net.Contrib/EventListeners/CountersEventListener.cs b/src/prometheus-net.Contrib/EventListeners/CountersEventListener.cs index 9b1b7cc..7ee1b7a 100644 --- a/src/prometheus-net.Contrib/EventListeners/CountersEventListener.cs +++ b/src/prometheus-net.Contrib/EventListeners/CountersEventListener.cs @@ -7,7 +7,7 @@ namespace Prometheus.Contrib.EventListeners { public class CountersEventListener : EventListener { - private Dictionary counterAdapters = new Dictionary() + private readonly Dictionary counterAdapters = new Dictionary { ["System.Runtime"] = new PrometheusRuntimeCounterAdapter(), ["Microsoft.AspNetCore.Hosting"] = new PrometheusAspNetCoreCounterAdapter(), @@ -16,7 +16,7 @@ public class CountersEventListener : EventListener ["Grpc.Net.Client"] = new PrometheusGrpcServerCounterAdapter() }; - private IDictionary eventArguments = new Dictionary + private readonly IDictionary eventArguments = new Dictionary { ["EventCounterIntervalSec"] = "10" }; diff --git a/src/prometheus-net.Contrib/Healthchecks/PrometheusHealthcheckPublisher.cs b/src/prometheus-net.Contrib/Healthchecks/PrometheusHealthcheckPublisher.cs index 42ca216..176608a 100644 --- a/src/prometheus-net.Contrib/Healthchecks/PrometheusHealthcheckPublisher.cs +++ b/src/prometheus-net.Contrib/Healthchecks/PrometheusHealthcheckPublisher.cs @@ -2,7 +2,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Diagnostics.HealthChecks; -using Prometheus; namespace Prometheus.Contrib.Healthchecks { diff --git a/src/prometheus-net.Contrib/prometheus-net.Contrib.csproj b/src/prometheus-net.Contrib/prometheus-net.Contrib.csproj index ccbce75..1a16b4c 100644 --- a/src/prometheus-net.Contrib/prometheus-net.Contrib.csproj +++ b/src/prometheus-net.Contrib/prometheus-net.Contrib.csproj @@ -5,7 +5,6 @@ Prometheus.Contrib prometheus-net.Contrib prometheus-net.Contrib - prometheus metrics diff --git a/src/prometheus-net.EntityFramework/prometheus-net.EntityFramework.csproj b/src/prometheus-net.EntityFramework/prometheus-net.EntityFramework.csproj index 593af58..32813e1 100644 --- a/src/prometheus-net.EntityFramework/prometheus-net.EntityFramework.csproj +++ b/src/prometheus-net.EntityFramework/prometheus-net.EntityFramework.csproj @@ -7,7 +7,7 @@ - + diff --git a/src/prometheus-net.MassTransit/DiagnosticServiceCollectionExtensions.cs b/src/prometheus-net.MassTransit/DiagnosticServiceCollectionExtensions.cs index 36ff07b..d10672e 100644 --- a/src/prometheus-net.MassTransit/DiagnosticServiceCollectionExtensions.cs +++ b/src/prometheus-net.MassTransit/DiagnosticServiceCollectionExtensions.cs @@ -1,5 +1,7 @@ +using MassTransit; using Prometheus.Contrib.Core; using Prometheus.MassTransit.Diagnostics; +using Prometheus.MassTransit.Observers; namespace Microsoft.Extensions.DependencyInjection { @@ -14,5 +16,12 @@ public static void AddPrometheusMassTransitMetrics(this IServiceCollection servi services.AddSingleton(aspNetCoreListenerHandler); } + + public static void ConnectPrometheusObservers(this IBusControl busControl) + { + busControl.ConnectPublishObserver(new PrometheusPublishObserver()); + busControl.ConnectReceiveObserver(new PrometheusConsumeObserver()); + busControl.ConnectSendObserver(new PrometheusSendObserver()); + } } } diff --git a/src/prometheus-net.MassTransit/Diagnostics/MassTransitListenerHandler.cs b/src/prometheus-net.MassTransit/Diagnostics/MassTransitListenerHandler.cs index 51ae466..40b935a 100644 --- a/src/prometheus-net.MassTransit/Diagnostics/MassTransitListenerHandler.cs +++ b/src/prometheus-net.MassTransit/Diagnostics/MassTransitListenerHandler.cs @@ -1,5 +1,6 @@ using System.Diagnostics; -using System.Threading; +using System.Linq; +using MassTransit.Logging; using Prometheus.Contrib.Core; namespace Prometheus.MassTransit.Diagnostics @@ -8,85 +9,163 @@ public class MassTransitListenerHandler : DiagnosticListenerHandler { private static class PrometheusCounters { - private static string[] DefaultLabelNames = new[] { "message" }; - private static string[] ErrorLabelNames = new[] { "exception" }; - public static readonly Histogram SendMessageCount = Metrics.CreateHistogram( - "masstransit_total_sent_messages", - "Total sent messages.", - new HistogramConfiguration { LabelNames = DefaultLabelNames }); + "masstransit_messages_sent_total", + "Total sent messages."); public static readonly Counter SendMessageErrors = Metrics.CreateCounter( - "masstransit_total_sent_messages_errors", + "masstransit_messages_sent_errors_total", "Total sent messages errors", - new CounterConfiguration { LabelNames = ErrorLabelNames }); + new CounterConfiguration { LabelNames = new[] { "exception" } }); + + public static readonly Histogram ReceiveMessageCount = Metrics.CreateHistogram( + "masstransit_messages_received_total", + "The time to receive a message, in seconds.", + new HistogramConfiguration { LabelNames = new[] { "message" } }); + + public static readonly Counter ReceiveMessageError = Metrics.CreateCounter( + "masstransit_messages_received_errors_total", + "The number of message processing failures.", + new CounterConfiguration { LabelNames = new[] { "exception" } }); public static readonly Histogram ConsumeMessageCount = Metrics.CreateHistogram( - "masstransit_total_consumed_messages", + "masstransit_messages_consumed_total", "The time to consume a message, in seconds.", - new HistogramConfiguration { LabelNames = DefaultLabelNames }); - - public static readonly Histogram ConsumeCriticalDuration = Metrics.CreateHistogram( - "masstransit_critical_time_seconds", - "The time between when message is sent and when it is consumed, in seconds.", - new HistogramConfiguration { LabelNames = DefaultLabelNames }); + new HistogramConfiguration { LabelNames = new[] { "consumer" } }); public static readonly Counter ConsumeMessageError = Metrics.CreateCounter( - "masstransit_total_consumed_messages_errors", + "masstransit_messages_consumed_errors_total", "The number of message processing failures.", - new CounterConfiguration { LabelNames = ErrorLabelNames }); - } + new CounterConfiguration { LabelNames = new[] { "exception" } }); + + public static readonly Counter SagaRaisedEvents = Metrics.CreateCounter( + "masstransit_saga_raised_events_total", + "The time to receive a message, in seconds.", + new CounterConfiguration { LabelNames = new[] { "saga", "from", "to" } }); + + public static readonly Counter SagaSentEvents = Metrics.CreateCounter( + "masstransit_saga_sent_events_total", + "The time to receive a message, in seconds.", + new CounterConfiguration { LabelNames = new[] { "saga" } }); - private readonly PropertyFetcher messageTypeFetcher = new PropertyFetcher("MessageType"); - private readonly PropertyFetcher consumerTypeFetcher = new PropertyFetcher("ConsumerType"); - private readonly PropertyFetcher exchangeFetcher = new PropertyFetcher("Exchange"); + public static readonly Counter SagaSentQuery = Metrics.CreateCounter( + "masstransit_saga_sent_queries_total", + "The time to receive a message, in seconds."); - private AsyncLocal messageTypeContext = new AsyncLocal(); - private AsyncLocal exchangeContext = new AsyncLocal(); + public static readonly Counter CourierExecute = Metrics.CreateCounter( + "masstransit_courier_executed_total", + "The time to receive a message, in seconds.", + new CounterConfiguration { LabelNames = new[] { "activity" } }); + + public static readonly Counter CourierCompensate = Metrics.CreateCounter( + "masstransit_courier_compensate_total", + "The time to receive a message, in seconds.", + new CounterConfiguration { LabelNames = new[] { "activity" } }); + } public MassTransitListenerHandler(string sourceName) : base(sourceName) { } public override void OnStartActivity(Activity activity, object payload) + { + } + + public override void OnStopActivity(Activity activity, object payload) { switch (activity.OperationName) { - case "Transport.Send": + case OperationName.Transport.Send: { - var messageType = exchangeFetcher.Fetch(payload); - exchangeContext.Value = messageType.ToString(); + PrometheusCounters.SendMessageCount.Observe(activity.Duration.TotalSeconds); } break; - case "Transport.Receive": - break; - case "Consumer.Consume": + case OperationName.Transport.Receive: { - var messageType = messageTypeFetcher.Fetch(payload); - messageTypeContext.Value = messageType.ToString(); + var messageType = activity.Tags + .Where(c => c.Key == DiagnosticHeaders.MessageTypes) + .Select(c => c.Value) + .FirstOrDefault(); + + PrometheusCounters.ReceiveMessageCount + .WithLabels(messageType) + .Observe(activity.Duration.TotalSeconds); } break; - } - } - - public override void OnStopActivity(Activity activity, object payload) - { - switch (activity.OperationName) - { - case "Transport.Send": + case OperationName.Consumer.Consume: { - PrometheusCounters.SendMessageCount - .WithLabels(exchangeContext.Value) + var consumerType = activity.Tags + .Where(c => c.Key == DiagnosticHeaders.ConsumerType) + .Select(c => c.Value) + .FirstOrDefault(); + + PrometheusCounters.ConsumeMessageCount + .WithLabels(consumerType) .Observe(activity.Duration.TotalSeconds); } break; - case "Consumer.Consume": + case OperationName.Consumer.Handle: { PrometheusCounters.ConsumeMessageCount - .WithLabels(messageTypeContext.Value) .Observe(activity.Duration.TotalSeconds); + } + break; - // TODO: calculate critical time + case OperationName.Saga.Send: + { + var sagaType = activity.Tags.Where(c => c.Key == DiagnosticHeaders.SagaType).Select(c => c.Value).FirstOrDefault(); + + PrometheusCounters.SagaSentEvents + .WithLabels(sagaType) + .Inc(); + } + break; + case OperationName.Saga.SendQuery: + { + PrometheusCounters.SagaSentQuery.Inc(); + } + break; + case OperationName.Saga.RaiseEvent: + { + var sagaType = activity.Tags.Where(c => c.Key == DiagnosticHeaders.SagaType).Select(c => c.Value).FirstOrDefault(); + var beginState = activity.Tags.Where(c => c.Key == DiagnosticHeaders.BeginState).Select(c => c.Value).FirstOrDefault(); + var endState = activity.Tags.Where(c => c.Key == DiagnosticHeaders.EndState).Select(c => c.Value).FirstOrDefault(); + + PrometheusCounters.SagaRaisedEvents + .WithLabels(sagaType, beginState, endState) + .Inc(); + } + break; + + case OperationName.Saga.Initiate: + { + } + break; + case OperationName.Saga.Orchestrate: + { + } + break; + case OperationName.Saga.Observe: + { + } + break; + + case OperationName.Courier.Execute: + { + var activityType = activity.Tags.Where(c => c.Key == DiagnosticHeaders.ActivityType).Select(c => c.Value).FirstOrDefault(); + + PrometheusCounters.CourierExecute + .WithLabels(activityType) + .Inc(); + } + break; + case OperationName.Courier.Compensate: + { + var activityType = activity.Tags.Where(c => c.Key == DiagnosticHeaders.ActivityType).Select(c => c.Value).FirstOrDefault(); + + PrometheusCounters.CourierCompensate + .WithLabels(activityType) + .Inc(); } break; } @@ -96,10 +175,13 @@ public override void OnException(Activity activity, object payload) { switch (activity.OperationName) { - case "Transport.Send": + case OperationName.Transport.Send: PrometheusCounters.SendMessageErrors.Inc(); break; - case "Consumer.Consume": + case OperationName.Transport.Receive: + PrometheusCounters.ReceiveMessageError.Inc(); + break; + case OperationName.Consumer.Consume: PrometheusCounters.ConsumeMessageError.Inc(); break; } diff --git a/src/prometheus-net.MassTransit/Observers/PrometheusConsumeObserver.cs b/src/prometheus-net.MassTransit/Observers/PrometheusConsumeObserver.cs new file mode 100644 index 0000000..19213bd --- /dev/null +++ b/src/prometheus-net.MassTransit/Observers/PrometheusConsumeObserver.cs @@ -0,0 +1,52 @@ +using System; +using System.Threading.Tasks; +using MassTransit; + +namespace Prometheus.MassTransit.Observers +{ + public class PrometheusConsumeObserver : IReceiveObserver + { + private static readonly Histogram ConsumeTimer = Metrics.CreateHistogram( + "masstransit_messages_consumed_total", + "The time to consume a message, in seconds.", + new HistogramConfiguration { LabelNames = new[] { "message" } }); + + private static readonly Histogram CriticalTimer = Metrics.CreateHistogram( + "masstransit_critical_time_seconds", + "The time between when message is sent and when it is consumed, in seconds.", + new HistogramConfiguration { LabelNames = new[] { "message" } }); + + private static readonly Counter ErrorCounter = Metrics.CreateCounter( + "masstransit_messages_consumed_errors_total", + "The number of message processing failures.", + new CounterConfiguration { LabelNames = new[] { "exception" } }); + + public Task PreReceive(ReceiveContext context) => Task.CompletedTask; + public Task PostReceive(ReceiveContext context) => Task.CompletedTask; + public Task ReceiveFault(ReceiveContext context, Exception exception) => Task.CompletedTask; + + public Task PostConsume(ConsumeContext context, TimeSpan duration, string consumerType) where T : class + { + var messageType = typeof(T).ToString(); + + ConsumeTimer.WithLabels(messageType).Observe(duration.TotalSeconds); + + if (context.SentTime != null) + { + CriticalTimer.WithLabels(messageType) + .Observe((DateTime.UtcNow - context.SentTime.Value).TotalSeconds); + } + + return Task.CompletedTask; + } + + public Task ConsumeFault(ConsumeContext context, TimeSpan duration, string consumerType, Exception exception) where T : class + { + var messageType = typeof(T).ToString(); + + ErrorCounter.WithLabels(messageType).Inc(); + + return Task.CompletedTask; + } + } +} diff --git a/src/prometheus-net.MassTransit/Observers/PrometheusPublishObserver.cs b/src/prometheus-net.MassTransit/Observers/PrometheusPublishObserver.cs new file mode 100644 index 0000000..60ecfb2 --- /dev/null +++ b/src/prometheus-net.MassTransit/Observers/PrometheusPublishObserver.cs @@ -0,0 +1,44 @@ +using System; +using System.Threading.Tasks; +using MassTransit; + +namespace Prometheus.MassTransit.Observers +{ + public class PrometheusPublishObserver : IPublishObserver + { + private static readonly Counter SentMessagesCount = Metrics.CreateCounter( + "masstransit_messages_sent_total", + "Total published messages.", + new CounterConfiguration { LabelNames = new[] { "operation", "message" } }); + + private static readonly Counter SentMessagesErrors = Metrics.CreateCounter( + "masstransit_messages_sent_errors_total", + "Total published messages errors", + new CounterConfiguration { LabelNames = new[] { "operation", "exception" } }); + + public Task PrePublish(PublishContext context) + where T : class + { + return Task.CompletedTask; + } + + public Task PostPublish(PublishContext context) + where T : class + { + SentMessagesCount + .WithLabels("Publish", context.Message.GetType().ToString()) + .Inc(); + return Task.CompletedTask; + } + + public Task PublishFault(PublishContext context, Exception exception) + where T : class + { + SentMessagesErrors + .WithLabels("Publish", exception.GetType().Name) + .Inc(); + + return Task.CompletedTask; + } + } +} diff --git a/src/prometheus-net.MassTransit/Observers/PrometheusSendObserver.cs b/src/prometheus-net.MassTransit/Observers/PrometheusSendObserver.cs new file mode 100644 index 0000000..d764c61 --- /dev/null +++ b/src/prometheus-net.MassTransit/Observers/PrometheusSendObserver.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using MassTransit; + +namespace Prometheus.MassTransit.Observers +{ + public class PrometheusSendObserver : ISendObserver + { + private static readonly Counter SentMessagesCount = Metrics.CreateCounter( + "masstransit_messages_sent_total", + "Total published messages.", + new CounterConfiguration { LabelNames = new[] { "operation", "message" } }); + + private static readonly Counter SentMessagesErrors = Metrics.CreateCounter( + "masstransit_messages_sent_errors_total", + "Total published messages errors", + new CounterConfiguration { LabelNames = new[] { "operation", "exception" } }); + + public Task PreSend(SendContext context) + where T : class + { + return Task.CompletedTask; + } + + public Task PostSend(SendContext context) + where T : class + { + SentMessagesCount + .WithLabels("Send", context.Message.GetType().FullName) + .Inc(); + return Task.CompletedTask; + } + + public Task SendFault(SendContext context, Exception exception) + where T : class + { + SentMessagesErrors + .WithLabels("Send", exception.GetType().Name) + .Inc(); + return Task.CompletedTask; + } + } +} diff --git a/src/prometheus-net.MassTransit/prometheus-net.MassTransit.csproj b/src/prometheus-net.MassTransit/prometheus-net.MassTransit.csproj index a72a064..2cb9258 100644 --- a/src/prometheus-net.MassTransit/prometheus-net.MassTransit.csproj +++ b/src/prometheus-net.MassTransit/prometheus-net.MassTransit.csproj @@ -1,4 +1,4 @@ - + netcoreapp3.0 @@ -7,7 +7,7 @@ - +