Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional telemetry tags adds for various Orleans streaming components #8631

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Orleans.Configuration;
using Orleans.Internal;
using Orleans.Runtime;
using Orleans.Streaming;
using Orleans.Streams.Filtering;

namespace Orleans.Streams
Expand Down Expand Up @@ -439,6 +440,8 @@ private async Task<bool> ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver
}

var now = DateTime.UtcNow;
System.Diagnostics.TagList? tags = null;

// Try to cleanup the pubsub cache at the cadence of 10 times in the configurable StreamInactivityPeriod.
if ((now - lastTimeCleanedPubSubCache) >= this.options.StreamInactivityPeriod.Divide(StreamInactivityCheckFrequency))
{
Expand Down Expand Up @@ -480,7 +483,13 @@ private async Task<bool> ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver

queueCache?.AddToCache(multiBatch);
numMessages += multiBatch.Count;
StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count);

if (StreamInstruments.PersistentStreamReadMessages.Enabled)
{
tags = StreamInstrumentsTagUtils.InitializeTags(myQueueId, streamProviderName);
StreamInstruments.PersistentStreamReadMessages.Add(multiBatch.Count, tags.Value);
}

if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace(
(int)ErrorCode.PersistentStreamPullingAgent_11,
Expand Down Expand Up @@ -571,6 +580,7 @@ private void StartInactiveCursors(StreamConsumerCollection streamData, StreamSeq

private async Task RunConsumerCursor(StreamConsumerData consumerData)
{
System.Diagnostics.TagList? tags = null;
try
{
// double check in case of interleaving
Expand Down Expand Up @@ -605,7 +615,13 @@ private async Task RunConsumerCursor(StreamConsumerData consumerData)

try
{
StreamInstruments.PersistentStreamSentMessages.Add(1);
if (StreamInstruments.PersistentStreamSentMessages.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(
consumerData.StreamId, consumerData.SubscriptionId);
StreamInstruments.PersistentStreamSentMessages.Add(1, tags.Value);
}

if (batch != null)
{
StreamHandshakeToken newToken = await AsyncExecutorWithRetries.ExecuteWithRetries(
Expand Down
133 changes: 105 additions & 28 deletions src/Orleans.Streaming/PubSub/PubSubRendezvousGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
using Orleans.Runtime;
using Orleans.Serialization.Serializers;
using Orleans.Storage;
using Orleans.Streaming;
using Orleans.Streams.Core;
using TagList = System.Diagnostics.TagList;

#nullable enable
namespace Orleans.Streams
Expand Down Expand Up @@ -51,10 +53,12 @@ public StateStorageBridge<PubSubGrainState> GetStorage(PubSubRendezvousGrain gra
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Fallback to storage provider {ProviderName}", ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME);
_logger.LogDebug("Fallback to storage provider {ProviderName}",
ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME);
}

storage = _serviceProvider.GetRequiredKeyedService<IGrainStorage>(ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME);
storage = _serviceProvider.GetRequiredKeyedService<IGrainStorage>(ProviderConstants
.DEFAULT_PUBSUB_PROVIDER_NAME);
}

var activatorProvider = _serviceProvider.GetRequiredService<IActivatorProvider>();
Expand All @@ -66,8 +70,8 @@ public StateStorageBridge<PubSubGrainState> GetStorage(PubSubRendezvousGrain gra
[GenerateSerializer]
internal sealed class PubSubGrainState
{
[Id(0)]
public HashSet<PubSubPublisherState> Producers { get; set; } = new HashSet<PubSubPublisherState>();
[Id(0)] public HashSet<PubSubPublisherState> Producers { get; set; } = new HashSet<PubSubPublisherState>();

[Id(1)]
public HashSet<PubSubSubscriptionState> Consumers { get; set; } = new HashSet<PubSubSubscriptionState>();
}
Expand All @@ -83,7 +87,8 @@ internal sealed class PubSubRendezvousGrain : Grain, IPubSubRendezvousGrain, IGr

private PubSubGrainState State => _storage.State;

public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory, ILogger<PubSubRendezvousGrain> logger)
public PubSubRendezvousGrain(PubSubGrainStateStorageFactory storageFactory,
ILogger<PubSubRendezvousGrain> logger)
{
_storageFactory = storageFactory;
_logger = logger;
Expand All @@ -102,17 +107,29 @@ public override Task OnDeactivateAsync(DeactivationReason reason, CancellationTo
return Task.CompletedTask;
}

public async Task<ISet<PubSubSubscriptionState>> RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer)
public async Task<ISet<PubSubSubscriptionState>> RegisterProducer(QualifiedStreamId streamId,
GrainId streamProducer)
{
StreamInstruments.PubSubProducersAdded.Add(1);
TagList? tags = null;

if (StreamInstruments.PubSubProducersAdded.Enabled)
{
tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer);
StreamInstruments.PubSubProducersAdded.Add(1, tags.Value);
}

try
{
var publisherState = new PubSubPublisherState(streamId, streamProducer);
State.Producers.Add(publisherState);
LogPubSubCounts("RegisterProducer {0}", streamProducer);
await WriteStateAsync();
StreamInstruments.PubSubProducersTotal.Add(1);

if (StreamInstruments.PubSubProducersTotal.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer);
StreamInstruments.PubSubProducersTotal.Add(1, tags.Value);
}
}
catch (Exception exc)
{
Expand All @@ -127,12 +144,20 @@ public async Task<ISet<PubSubSubscriptionState>> RegisterProducer(QualifiedStrea
DeactivateOnIdle();
throw;
}

return State.Consumers.Where(c => !c.IsFaulted).ToSet();
}

public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamProducer)
{
StreamInstruments.PubSubProducersRemoved.Add(1);
TagList? tags = null;

if (StreamInstruments.PubSubProducersRemoved.Enabled)
{
tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer);
StreamInstruments.PubSubProducersRemoved.Add(1, tags.Value);
}

try
{
int numRemoved = State.Producers.RemoveWhere(s => s.Equals(streamId, streamProducer));
Expand All @@ -145,7 +170,12 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP
: WriteStateAsync();
await updateStorageTask;
}
StreamInstruments.PubSubProducersTotal.Add(-numRemoved);

if (StreamInstruments.PubSubProducersTotal.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamProducer);
StreamInstruments.PubSubProducersTotal.Add(-numRemoved, tags.Value);
}
}
catch (Exception exc)
{
Expand All @@ -160,6 +190,7 @@ public async Task UnregisterProducer(QualifiedStreamId streamId, GrainId streamP
DeactivateOnIdle();
throw;
}

if (State.Producers.Count == 0 && State.Consumers.Count == 0)
{
DeactivateOnIdle(); // No producers or consumers left now, so flag ourselves to expedite Deactivation
Expand All @@ -172,7 +203,14 @@ public async Task RegisterConsumer(
GrainId streamConsumer,
string filterData)
{
StreamInstruments.PubSubConsumersAdded.Add(1);
TagList? tags = null;

if (StreamInstruments.PubSubConsumersAdded.Enabled)
{
tags = StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer);
StreamInstruments.PubSubConsumersAdded.Add(1, tags.Value);
}

var pubSubState = State.Consumers.FirstOrDefault(s => s.Equals(subscriptionId));
if (pubSubState != null && pubSubState.IsFaulted)
throw new FaultedSubscriptionException(subscriptionId, streamId);
Expand All @@ -189,7 +227,12 @@ public async Task RegisterConsumer(

LogPubSubCounts("RegisterConsumer {0}", streamConsumer);
await WriteStateAsync();
StreamInstruments.PubSubConsumersTotal.Add(1);

if (StreamInstruments.PubSubConsumersTotal.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer);
StreamInstruments.PubSubConsumersTotal.Add(1, tags.Value);
}
}
catch (Exception exc)
{
Expand All @@ -211,7 +254,8 @@ public async Task RegisterConsumer(
return;

if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}",
_logger.LogDebug(
"Notifying {ProducerCount} existing producer(s) about new consumer {Consumer}. Producers={Producers}",
numProducers, streamConsumer, Utils.EnumerableToString(State.Producers));

// Notify producers about a new streamConsumer.
Expand All @@ -222,7 +266,8 @@ public async Task RegisterConsumer(
{
foreach (PubSubPublisherState producerState in producers)
{
tasks.Add(ExecuteProducerTask(producerState, p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData)));
tasks.Add(ExecuteProducerTask(producerState,
p => p.AddSubscriber(subscriptionId, streamId, streamConsumer, filterData)));
}

Exception? exception = null;
Expand All @@ -239,7 +284,13 @@ public async Task RegisterConsumer(
if (State.Producers.Count != initialProducerCount)
{
await WriteStateAsync();
StreamInstruments.PubSubConsumersTotal.Add(-(initialProducerCount - State.Producers.Count));

if (StreamInstruments.PubSubConsumersTotal.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, streamConsumer);
StreamInstruments.PubSubConsumersTotal.Add(
-(initialProducerCount - State.Producers.Count), tags.Value);
}
}

if (exception != null)
Expand Down Expand Up @@ -275,7 +326,13 @@ private void RemoveProducer(PubSubPublisherState producer)

public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId streamId)
{
StreamInstruments.PubSubConsumersRemoved.Add(1);
TagList? tags = null;

if (StreamInstruments.PubSubConsumersRemoved.Enabled)
{
tags = StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId);
StreamInstruments.PubSubConsumersRemoved.Add(1, tags.Value);
}

try
{
Expand All @@ -294,9 +351,15 @@ public async Task UnregisterConsumer(GuidId subscriptionId, QualifiedStreamId st
{
await WriteStateAsync();
}

await NotifyProducersOfRemovedSubscription(subscriptionId, streamId);
}
StreamInstruments.PubSubConsumersTotal.Add(-numRemoved);

if (StreamInstruments.PubSubConsumersTotal.Enabled)
{
tags ??= StreamInstrumentsTagUtils.InitializeTags(streamId, subscriptionId);
StreamInstruments.PubSubConsumersTotal.Add(-numRemoved, tags.Value);
}
}
catch (Exception exc)
{
Expand Down Expand Up @@ -345,8 +408,10 @@ private void LogPubSubCounts(string fmt, params object[] args)
numConsumers = State.Consumers.Count;

string when = args != null && args.Length != 0 ? string.Format(fmt, args) : fmt;
_logger.LogDebug("{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}",
when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers), Utils.EnumerableToString(State?.Producers));
_logger.LogDebug(
"{When}. Now have total of {ProducerCount} producers and {ConsumerCount} consumers. All Consumers = {Consumers}, All Producers = {Producers}",
when, numProducers, numConsumers, Utils.EnumerableToString(State?.Consumers),
Utils.EnumerableToString(State?.Producers));
}
}

Expand Down Expand Up @@ -406,7 +471,6 @@ public Task<List<StreamSubscription>> GetAllSubscriptions(QualifiedStreamId stre
c.Consumer)).ToList();
return Task.FromResult(subscriptions);
}

}

public async Task FaultSubscription(GuidId subscriptionId)
Expand All @@ -416,10 +480,12 @@ public async Task FaultSubscription(GuidId subscriptionId)
{
return;
}

try
{
pubSubState.Fault();
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId);
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug("Setting subscription {SubscriptionId} to a faulted state.", subscriptionId);

await WriteStateAsync();
await NotifyProducersOfRemovedSubscription(pubSubState.SubscriptionId, pubSubState.Stream);
Expand All @@ -443,11 +509,15 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q
int numProducersBeforeNotify = State.Producers.Count;
if (numProducersBeforeNotify > 0)
{
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.", numProducersBeforeNotify);
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug(
"Notifying {ProducerCountBeforeNotify} existing producers about unregistered consumer.",
numProducersBeforeNotify);

// Notify producers about unregistered consumer.
List<Task> tasks = State.Producers
.Select(producerState => ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId)))
.Select(producerState =>
ExecuteProducerTask(producerState, p => p.RemoveSubscriber(subscriptionId, streamId)))
.ToList();
await Task.WhenAll(tasks);
//if producers got removed
Expand All @@ -462,15 +532,18 @@ private async Task NotifyProducersOfRemovedSubscription(GuidId subscriptionId, Q
/// <returns></returns>
private async Task<bool> TryClearState()
{
if (State.Producers.Count == 0 && State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause
if (State.Producers.Count == 0 &&
State.Consumers.Count == 0) // + we already know that numProducers == 0 from previous if-clause
{
await ClearStateAsync(); //State contains no producers or consumers, remove it from storage
return true;
}

return false;
}

private async Task ExecuteProducerTask(PubSubPublisherState producer, Func<IStreamProducerExtension, Task> producerTask)
private async Task ExecuteProducerTask(PubSubPublisherState producer,
Func<IStreamProducerExtension, Task> producerTask)
{
try
{
Expand Down Expand Up @@ -504,7 +577,11 @@ private async Task ExecuteProducerTask(PubSubPublisherState producer, Func<IStre
private Task ReadStateAsync() => _storage.ReadStateAsync();
private Task WriteStateAsync() => _storage.WriteStateAsync();
private Task ClearStateAsync() => _storage.ClearStateAsync();
void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) => _storage.OnDehydrate(dehydrationContext);
void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) => _storage.OnRehydrate(rehydrationContext);

void IGrainMigrationParticipant.OnDehydrate(IDehydrationContext dehydrationContext) =>
_storage.OnDehydrate(dehydrationContext);

void IGrainMigrationParticipant.OnRehydrate(IRehydrationContext rehydrationContext) =>
_storage.OnRehydrate(rehydrationContext);
}
}
}
Loading
Loading