diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs index 011e9fe82d..b55495a8b3 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs @@ -7,6 +7,7 @@ using Orleans.Configuration; using Orleans.Internal; using Orleans.Runtime; +using Orleans.Streaming; using Orleans.Streams.Filtering; namespace Orleans.Streams @@ -439,6 +440,8 @@ private async Task ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver } var now = DateTime.UtcNow; + System.Diagnostics.TagList? tags = null; + // Try to cleanup the pubsub cache at the cadence of 10 times in the configurable StreamInactivityPeriod. if ((now - lastTimeCleanedPubSubCache) >= this.options.StreamInactivityPeriod.Divide(StreamInactivityCheckFrequency)) { @@ -480,7 +483,13 @@ private async Task ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver queueCache?.AddToCache(multiBatch); numMessages += multiBatch.Count; - StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count); + + if (StreamInstruments.PersistentStreamReadMessages.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(myQueueId, streamProviderName); + StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count, tags.Value); + } + if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace( (int)ErrorCode.PersistentStreamPullingAgent_11, @@ -571,6 +580,7 @@ private void StartInactiveCursors(StreamConsumerCollection streamData, StreamSeq private async Task RunConsumerCursor(StreamConsumerData consumerData) { + System.Diagnostics.TagList? tags = null; try { // double check in case of interleaving @@ -605,7 +615,13 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData) try { - StreamInstruments.PersistentStreamSentMessages.Add(1); + if (StreamInstruments.PersistentStreamSentMessages.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags( + consumerData.StreamId, consumerData.SubscriptionId); + StreamInstruments.PersistentStreamSentMessages.Add(1, tags.Value); + } + if (batch != null) { StreamHandshakeToken newToken = await AsyncExecutorWithRetries.ExecuteWithRetries( diff --git a/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs b/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs index 60c7fcd27f..0fd5351bc4 100644 --- a/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs +++ b/src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs @@ -12,7 +12,9 @@ using Orleans.Runtime; using Orleans.Serialization.Serializers; using Orleans.Storage; +using Orleans.Streaming; using Orleans.Streams.Core; +using TagList = System.Diagnostics.TagList; #nullable enable namespace Orleans.Streams @@ -51,10 +53,12 @@ public StateStorageBridge GetStorage(PubSubRendezvousGrain gra { if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Fallback to storage provider {ProviderName}", ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); + _logger.LogDebug("Fallback to storage provider {ProviderName}", + ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); } - storage = _serviceProvider.GetRequiredKeyedService(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME); + storage = _serviceProvider.GetRequiredKeyedService(ProviderConstants + .DEFAULT_PUBSUB_PROVIDER_NAME); } var activatorProvider = _serviceProvider.GetRequiredService(); @@ -66,8 +70,8 @@ public StateStorageBridge GetStorage(PubSubRendezvousGrain gra [GenerateSerializer] internal sealed class PubSubGrainState { - [Id(0)] - public HashSet Producers { get; set; } = new HashSet(); + [Id(0)] public HashSet Producers { get; set; } = new HashSet(); + [Id(1)] public HashSet Consumers { get; set; } = new HashSet(); } @@ -83,7 +87,8 @@ internal sealed class PubSubRendezvousGrain : Grain, IPubSubRendezvousGrain, IGr private PubSubGrainState State => _storage.State; - public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory, ILogger logger) + public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory, + ILogger logger) { _storageFactory = storageFactory; _logger = logger; @@ -102,9 +107,16 @@ public override Task OnDeactivateAsync(DeactivationReason reason, CancellationTo return Task.CompletedTask; } - public async Task> RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer) + public async Task> RegisterProducer(QualifiedStreamId streamId, + GrainId streamProducer) { - StreamInstruments.PubSubProducersAdded.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubProducersAdded.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersAdded.Add(1, tags.Value); + } try { @@ -112,7 +124,12 @@ public async Task> RegisterProducer(QualifiedStrea State.Producers.Add(publisherState); LogPubSubCounts("RegisterProducer {0}", streamProducer); await WriteStateAsync(); - StreamInstruments.PubSubProducersTotal.Add(1); + + if (StreamInstruments.PubSubProducersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersTotal.Add(1, tags.Value); + } } catch (Exception exc) { @@ -127,12 +144,20 @@ public async Task> RegisterProducer(QualifiedStrea DeactivateOnIdle(); throw; } + return State.Consumers.Where(c => !c.IsFaulted).ToSet(); } public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamProducer) { - StreamInstruments.PubSubProducersRemoved.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubProducersRemoved.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersRemoved.Add(1, tags.Value); + } + try { int numRemoved = State.Producers.RemoveWhere(s => s.Equals(streamId, streamProducer)); @@ -145,7 +170,12 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP : WriteStateAsync(); await updateStorageTask; } - StreamInstruments.PubSubProducersTotal.Add(-numRemoved); + + if (StreamInstruments.PubSubProducersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer); + StreamInstruments.PubSubProducersTotal.Add(-numRemoved, tags.Value); + } } catch (Exception exc) { @@ -160,6 +190,7 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP DeactivateOnIdle(); throw; } + if (State.Producers.Count == 0 && State.Consumers.Count == 0) { DeactivateOnIdle(); // No producers or consumers left now, so flag ourselves to expedite Deactivation @@ -172,7 +203,14 @@ public async Task RegisterConsumer( GrainId streamConsumer, string filterData) { - StreamInstruments.PubSubConsumersAdded.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubConsumersAdded.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersAdded.Add(1, tags.Value); + } + var pubSubState = State.Consumers.FirstOrDefault(s => s.Equals(subscriptionId)); if (pubSubState != null && pubSubState.IsFaulted) throw new FaultedSubscriptionException(subscriptionId, streamId); @@ -189,7 +227,12 @@ public async Task RegisterConsumer( LogPubSubCounts("RegisterConsumer {0}", streamConsumer); await WriteStateAsync(); - StreamInstruments.PubSubConsumersTotal.Add(1); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersTotal.Add(1, tags.Value); + } } catch (Exception exc) { @@ -211,7 +254,8 @@ public async Task RegisterConsumer( return; if (_logger.IsEnabled(LogLevel.Debug)) - _logger.LogDebug("Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}", + _logger.LogDebug( + "Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}", numProducers, streamConsumer, Utils.EnumerableToString(State.Producers)); // Notify producers about a new streamConsumer. @@ -222,7 +266,8 @@ public async Task RegisterConsumer( { foreach (PubSubPublisherState producerState in producers) { - tasks.Add(ExecuteProducerTask(producerState, p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData))); + tasks.Add(ExecuteProducerTask(producerState, + p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData))); } Exception? exception = null; @@ -239,7 +284,13 @@ public async Task RegisterConsumer( if (State.Producers.Count != initialProducerCount) { await WriteStateAsync(); - StreamInstruments.PubSubConsumersTotal.Add(-(initialProducerCount - State.Producers.Count)); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer); + StreamInstruments.PubSubConsumersTotal.Add( + -(initialProducerCount - State.Producers.Count), tags.Value); + } } if (exception != null) @@ -275,7 +326,13 @@ private void RemoveProducer(PubSubPublisherState producer) public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId streamId) { - StreamInstruments.PubSubConsumersRemoved.Add(1); + TagList? tags = null; + + if (StreamInstruments.PubSubConsumersRemoved.Enabled) + { + tags = StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId); + StreamInstruments.PubSubConsumersRemoved.Add(1, tags.Value); + } try { @@ -294,9 +351,15 @@ public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId st { await WriteStateAsync(); } + await NotifyProducersOfRemovedSubscription(subscriptionId, streamId); } - StreamInstruments.PubSubConsumersTotal.Add(-numRemoved); + + if (StreamInstruments.PubSubConsumersTotal.Enabled) + { + tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId); + StreamInstruments.PubSubConsumersTotal.Add(-numRemoved, tags.Value); + } } catch (Exception exc) { @@ -345,8 +408,10 @@ private void LogPubSubCounts(string fmt, params object[] args) numConsumers = State.Consumers.Count; string when = args != null && args.Length != 0 ? string.Format(fmt, args) : fmt; - _logger.LogDebug("{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}", - when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers), Utils.EnumerableToString(State?.Producers)); + _logger.LogDebug( + "{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}", + when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers), + Utils.EnumerableToString(State?.Producers)); } } @@ -406,7 +471,6 @@ public Task> GetAllSubscriptions(QualifiedStreamId stre c.Consumer)).ToList(); return Task.FromResult(subscriptions); } - } public async Task FaultSubscription(GuidId subscriptionId) @@ -416,10 +480,12 @@ public async Task FaultSubscription(GuidId subscriptionId) { return; } + try { pubSubState.Fault(); - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId); + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId); await WriteStateAsync(); await NotifyProducersOfRemovedSubscription(pubSubState.SubscriptionId, pubSubState.Stream); @@ -443,11 +509,15 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q int numProducersBeforeNotify = State.Producers.Count; if (numProducersBeforeNotify > 0) { - if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.", numProducersBeforeNotify); + if (_logger.IsEnabled(LogLevel.Debug)) + _logger.LogDebug( + "Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.", + numProducersBeforeNotify); // Notify producers about unregistered consumer. List tasks = State.Producers - .Select(producerState => ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId))) + .Select(producerState => + ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId))) .ToList(); await Task.WhenAll(tasks); //if producers got removed @@ -462,15 +532,18 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q /// private async Task TryClearState() { - if (State.Producers.Count == 0 && State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause + if (State.Producers.Count == 0 && + State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause { await ClearStateAsync(); //State contains no producers or consumers, remove it from storage return true; } + return false; } - private async Task ExecuteProducerTask(PubSubPublisherState producer, Func producerTask) + private async Task ExecuteProducerTask(PubSubPublisherState producer, + Func producerTask) { try { @@ -504,7 +577,11 @@ private async Task ExecuteProducerTask(PubSubPublisherState producer, Func _storage.ReadStateAsync(); private Task WriteStateAsync() => _storage.WriteStateAsync(); private Task ClearStateAsync() => _storage.ClearStateAsync(); - void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) => _storage.OnDehydrate(dehydrationContext); - void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) => _storage.OnRehydrate(rehydrationContext); + + void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) => + _storage.OnDehydrate(dehydrationContext); + + void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) => + _storage.OnRehydrate(rehydrationContext); } -} +} \ No newline at end of file diff --git a/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs new file mode 100644 index 0000000000..507c4b68f2 --- /dev/null +++ b/src/Orleans.Streaming/StreamInstrumentsTagUtils.cs @@ -0,0 +1,43 @@ +using System.Runtime.CompilerServices; +using Orleans.Runtime; +using Orleans.Streams; +using TagList = System.Diagnostics.TagList; + +namespace Orleans.Streaming; + +internal static class StreamInstrumentsTagUtils +{ + private const string STREAM_KEY = "stream"; + private const string STREAM_NAMESPACE = "namespace"; + private const string STREAM_PROVIDER_NAME = "provider"; + private const string STREAM_PRODUCER = "producer"; + private const string SUBSCRIPTION_ID = "subscription"; + private const string QUEUE_ID = "queue"; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static TagList InitializeTags(QualifiedStreamId streamId, GrainId streamProducer) => + new() + { + { STREAM_PROVIDER_NAME, streamId.ProviderName }, + { STREAM_KEY, streamId.StreamId.GetKeyAsString() }, + { STREAM_NAMESPACE, streamId.StreamId.GetNamespace() }, + { STREAM_PRODUCER, streamProducer.ToString() } + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal static TagList InitializeTags(QualifiedStreamId streamId, GuidId subscriptionId) => + new() + { + { SUBSCRIPTION_ID, subscriptionId.Guid }, + { STREAM_KEY, streamId.StreamId.GetKeyAsString() }, + { STREAM_NAMESPACE, streamId.StreamId.GetNamespace() } + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static TagList InitializeTags(QueueId queueId, string streamProviderName) => + new() + { + { QUEUE_ID, queueId.ToStringWithHashCode() }, + { STREAM_PROVIDER_NAME, streamProviderName } + }; +} \ No newline at end of file