From bb31c0af9052633a1c5105171d0c5d5671b32dfc Mon Sep 17 00:00:00 2001 From: Oisin G Date: Tue, 12 Sep 2023 16:18:09 -0400 Subject: [PATCH 1/2] Add telemetry support to various Orleans streaming components This commit introduces telemetry features to several components of the Orleans streaming system. Specifically, a new class, StreamInstrumentsTagUtils, has been created to provide methods for initializing the relevant telemetry tags. This utility is now utilized in numerous places throughout the codebase, including the PubSubRendezvousGrain and PersistentStreamPullingAgent classes, where it facilitates the monitoring of crucial streaming activities. This aids in diagnostics and performance tracking, contributing to ongoing system improvement efforts. --- .../PersistentStreamPullingAgent.cs | 20 ++- .../PubSub/PubSubRendezvousGrain.cs | 133 ++++++++++++++---- .../StreamInstrumentsTagUtils.cs | 43 ++++++ 3 files changed, 166 insertions(+), 30 deletions(-) create mode 100644 src/Orleans.Streaming/StreamInstrumentsTagUtils.cs diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs index 011e9fe82d..b55495a8b3 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs @@ -7,6 +7,7 @@ using Orleans.Configuration; using Orleans.Internal; using Orleans.Runtime; +using Orleans.Streaming; using Orleans.Streams.Filtering; namespace Orleans.Streams @@ -439,6 +440,8 @@ private async Task ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver } var now = DateTime.UtcNow; + System.Diagnostics.TagList? tags = null; + // Try to cleanup the pubsub cache at the cadence of 10 times in the configurable StreamInactivityPeriod. if ((now - lastTimeCleanedPubSubCache) >= this.options.StreamInactivityPeriod.Divide(StreamInactivityCheckFrequency)) { @@ -480,7 +483,13 @@ private async Task ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver queueCache?.AddToCache(multiBatch); numMessages += multiBatch.Count; - StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count); + + if (StreamInstruments.PersistentStreamReadMessages.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(myQueueId, streamProviderName); + StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count, tags.Value); + } + if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace( (int)ErrorCode.PersistentStreamPullingAgent_11, @@ -571,6 +580,7 @@ private void StartInactiveCursors(StreamConsumerCollection streamData, StreamSeq private async Task RunConsumerCursor(StreamConsumerData consumerData) { + System.Diagnostics.TagList? tags = null; try { // double check in case of interleaving @@ -605,7 +615,13 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData) try { - StreamInstruments.PersistentStreamSentMessages.Add(1); + if (StreamInstruments.PersistentStreamSentMessages.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags( + consumerData.StreamId, consumerData.SubscriptionId); + StreamInstruments.PersistentStreamSentMessages.Add(1, tags.Value); + } + if (batch != null) { StreamHandshakeToken newToken = await AsyncExecutorWithRetries.ExecuteWithRetries( diff --git a/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs b/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs index f38a615a24..a3b16e3915 100644 --- a/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs +++ b/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Runtime.ExceptionServices; using System.Text; @@ -10,7 +11,9 @@ using Orleans.Providers; using Orleans.Runtime; using Orleans.Storage; +using Orleans.Streaming; using Orleans.Streams.Core; +using TagList = System.Diagnostics.TagList; #nullable enable namespace Orleans.Streams @@ -49,10 +52,12 @@ public StateStorageBridge GetStorage(PubSubRendezvousGrain gra { if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Fallback to storage provider {ProviderName}", ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); + _logger.LogDebug("Fallback to storage provider {ProviderName}", + ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); } - storage = _serviceProvider.GetRequiredServiceByName(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); + storage = _serviceProvider.GetRequiredServiceByName(ProviderConstants + .DEFAULT_PUBSUB_PROVIDER_NAME); } return new(nameof(PubSubRendezvousGrain), grain.GrainContext, storage, _loggerFactory); @@ -63,8 +68,8 @@ public StateStorageBridge GetStorage(PubSubRendezvousGrain gra [GenerateSerializer] internal sealed class PubSubGrainState { - [Id(0)] - public HashSet Producers { get; set; } = new HashSet(); + [Id(0)] public HashSet Producers { get; set; } = new HashSet(); + [Id(1)] public HashSet Consumers { get; set; } = new HashSet(); } @@ -80,7 +85,8 @@ internal sealed class PubSubRendezvousGrain : Grain, IPubSubRendezvousGrain, IGr private PubSubGrainState State => _storage.State; - public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory, ILogger logger) + public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory, + ILogger logger) { _storageFactory = storageFactory; _logger = logger; @@ -99,9 +105,16 @@ public override Task OnDeactivateAsync(DeactivationReason reason, CancellationTo return Task.CompletedTask; } - public async Task> RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer) + public async Task> RegisterProducer(QualifiedStreamId streamId, + GrainId streamProducer) { - StreamInstruments.PubSubProducersAdded.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubProducersAdded.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersAdded.Add(1, tags.Value); + } try { @@ -109,7 +122,12 @@ public async Task> RegisterProducer(QualifiedStrea State.Producers.Add(publisherState); LogPubSubCounts("RegisterProducer {0}", streamProducer); await WriteStateAsync(); - StreamInstruments.PubSubProducersTotal.Add(1); + + if (StreamInstruments.PubSubProducersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersTotal.Add(1, tags.Value); + } } catch (Exception exc) { @@ -124,12 +142,20 @@ public async Task> RegisterProducer(QualifiedStrea DeactivateOnIdle(); throw; } + return State.Consumers.Where(c => !c.IsFaulted).ToSet(); } public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamProducer) { - StreamInstruments.PubSubProducersRemoved.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubProducersRemoved.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersRemoved.Add(1, tags.Value); + } + try { int numRemoved = State.Producers.RemoveWhere(s => s.Equals(streamId, streamProducer)); @@ -142,7 +168,12 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP : WriteStateAsync(); await updateStorageTask; } - StreamInstruments.PubSubProducersTotal.Add(-numRemoved); + + if (StreamInstruments.PubSubProducersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersTotal.Add(-numRemoved, tags.Value); + } } catch (Exception exc) { @@ -157,6 +188,7 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP DeactivateOnIdle(); throw; } + if (State.Producers.Count == 0 && State.Consumers.Count == 0) { DeactivateOnIdle(); // No producers or consumers left now, so flag ourselves to expedite Deactivation @@ -169,7 +201,14 @@ public async Task RegisterConsumer( GrainId streamConsumer, string filterData) { - StreamInstruments.PubSubConsumersAdded.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubConsumersAdded.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersAdded.Add(1, tags.Value); + } + var pubSubState = State.Consumers.FirstOrDefault(s => s.Equals(subscriptionId)); if (pubSubState != null && pubSubState.IsFaulted) throw new FaultedSubscriptionException(subscriptionId, streamId); @@ -186,7 +225,12 @@ public async Task RegisterConsumer( LogPubSubCounts("RegisterConsumer {0}", streamConsumer); await WriteStateAsync(); - StreamInstruments.PubSubConsumersTotal.Add(1); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersTotal.Add(1, tags.Value); + } } catch (Exception exc) { @@ -208,7 +252,8 @@ public async Task RegisterConsumer( return; if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}", + _logger.LogDebug( + "Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}", numProducers, streamConsumer, Utils.EnumerableToString(State.Producers)); // Notify producers about a new streamConsumer. @@ -219,7 +264,8 @@ public async Task RegisterConsumer( { foreach (PubSubPublisherState producerState in producers) { - tasks.Add(ExecuteProducerTask(producerState, p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData))); + tasks.Add(ExecuteProducerTask(producerState, + p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData))); } Exception? exception = null; @@ -236,7 +282,12 @@ public async Task RegisterConsumer( if (State.Producers.Count != initialProducerCount) { await WriteStateAsync(); - StreamInstruments.PubSubConsumersTotal.Add(-(initialProducerCount - State.Producers.Count)); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersTotal.Add( + -(initialProducerCount - State.Producers.Count), tags.Value); + } } if (exception != null) @@ -272,7 +323,13 @@ private void RemoveProducer(PubSubPublisherState producer) public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId streamId) { - StreamInstruments.PubSubConsumersRemoved.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubConsumersRemoved.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId); + StreamInstruments.PubSubConsumersRemoved.Add(1, tags.Value); + } try { @@ -291,9 +348,15 @@ public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId st { await WriteStateAsync(); } + await NotifyProducersOfRemovedSubscription(subscriptionId, streamId); } - StreamInstruments.PubSubConsumersTotal.Add(-numRemoved); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId); + StreamInstruments.PubSubConsumersTotal.Add(-numRemoved, tags.Value); + } } catch (Exception exc) { @@ -342,8 +405,10 @@ private void LogPubSubCounts(string fmt, params object[] args) numConsumers = State.Consumers.Count; string when = args != null && args.Length != 0 ? string.Format(fmt, args) : fmt; - _logger.LogDebug("{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}", - when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers), Utils.EnumerableToString(State?.Producers)); + _logger.LogDebug( + "{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}", + when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers), + Utils.EnumerableToString(State?.Producers)); } } @@ -403,7 +468,6 @@ public Task> GetAllSubscriptions(QualifiedStreamId stre c.Consumer)).ToList(); return Task.FromResult(subscriptions); } - } public async Task FaultSubscription(GuidId subscriptionId) @@ -413,10 +477,12 @@ public async Task FaultSubscription(GuidId subscriptionId) { return; } + try { pubSubState.Fault(); - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId); + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId); await WriteStateAsync(); await NotifyProducersOfRemovedSubscription(pubSubState.SubscriptionId, pubSubState.Stream); @@ -440,11 +506,15 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q int numProducersBeforeNotify = State.Producers.Count; if (numProducersBeforeNotify > 0) { - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.", numProducersBeforeNotify); + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug( + "Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.", + numProducersBeforeNotify); // Notify producers about unregistered consumer. List tasks = State.Producers - .Select(producerState => ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId))) + .Select(producerState => + ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId))) .ToList(); await Task.WhenAll(tasks); //if producers got removed @@ -459,15 +529,18 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q /// private async Task TryClearState() { - if (State.Producers.Count == 0 && State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause + if (State.Producers.Count == 0 && + State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause { await ClearStateAsync(); //State contains no producers or consumers, remove it from storage return true; } + return false; } - private async Task ExecuteProducerTask(PubSubPublisherState producer, Func producerTask) + private async Task ExecuteProducerTask(PubSubPublisherState producer, + Func producerTask) { try { @@ -501,7 +574,11 @@ private async Task ExecuteProducerTask(PubSubPublisherState producer, Func _storage.ReadStateAsync(); private Task WriteStateAsync() => _storage.WriteStateAsync(); private Task ClearStateAsync() => _storage.ClearStateAsync(); - void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) => _storage.OnDehydrate(dehydrationContext); - void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) => _storage.OnRehydrate(rehydrationContext); + + void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) => + _storage.OnDehydrate(dehydrationContext); + + void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) => + _storage.OnRehydrate(rehydrationContext); } -} +} \ No newline at end of file diff --git a/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs new file mode 100644 index 0000000000..71ae23ddc3 --- /dev/null +++ b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs @@ -0,0 +1,43 @@ +using System.Runtime.CompilerServices; +using Orleans.Runtime; +using Orleans.Streams; +using TagList = System.Diagnostics.TagList; + +namespace Orleans.Streaming; + +internal static class TelemetryUtils +{ + private const string STREAM_KEY = "stream"; + private const string STREAM_NAMESPACE = "namespace"; + private const string STREAM_PROVIDER_NAME = "provider"; + private const string STREAM_PRODUCER = "producer"; + private const string SUBSCRIPTION_ID = "subscription"; + private const string QUEUE_ID = "queue"; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static TagList InitializeTags(QualifiedStreamId streamId, GrainId streamProducer) => + new() + { + { STREAM_PROVIDER_NAME, streamId.ProviderName }, + { STREAM_KEY, streamId.StreamId.GetKeyAsString() }, + { STREAM_NAMESPACE, streamId.StreamId.GetNamespace() }, + { STREAM_PRODUCER, streamProducer.ToString() } + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static TagList InitializeTags(QualifiedStreamId streamId, GuidId subscriptionId) => + new() + { + { SUBSCRIPTION_ID, subscriptionId.Guid }, + { STREAM_KEY, streamId.StreamId.GetKeyAsString() }, + { STREAM_NAMESPACE, streamId.StreamId.GetNamespace() } + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static TagList InitializeTags(QueueId queueId, string streamProviderName) => + new() + { + { QUEUE_ID, queueId.ToStringWithHashCode() }, + { STREAM_PROVIDER_NAME, streamProviderName } + }; +} \ No newline at end of file From 77af7d6daac45d81a0c9ed10c649de75b68dac44 Mon Sep 17 00:00:00 2001 From: Oisin G Date: Tue, 12 Sep 2023 16:26:37 -0400 Subject: [PATCH 2/2] somehow hit ctrl+z before last commit --- src/Orleans.Streaming/StreamInstrumentsTagUtils.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs index 71ae23ddc3..507c4b68f2 100644 --- a/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs +++ b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs @@ -5,7 +5,7 @@ namespace Orleans.Streaming; -internal static class TelemetryUtils +internal static class StreamInstrumentsTagUtils { private const string STREAM_KEY = "stream"; private const string STREAM_NAMESPACE = "namespace";