Skip to content

Commit

Permalink
Modify ElasticSearch Instrumentation to take advantage of sampling de…
Browse files Browse the repository at this point in the history
…cision (#95)
  • Loading branch information
utpilla authored Mar 24, 2021
1 parent 826d77b commit 8906aae
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,68 +60,73 @@ public ElasticsearchRequestPipelineDiagnosticListener(ElasticsearchClientInstrum

public override void OnStartActivity(Activity activity, object payload)
{
var uri = this.uriFetcher.Fetch(payload);
// By this time, samplers have already run and
// activity.IsAllDataRequested populated accordingly.

if (uri == null)
if (Sdk.SuppressInstrumentation)
{
ElasticsearchInstrumentationEventSource.Log.NullPayload(nameof(ElasticsearchRequestPipelineDiagnosticListener), nameof(this.OnStartActivity));
return;
}

ActivityInstrumentationHelper.SetActivitySourceProperty(activity, ActivitySource);
ActivityInstrumentationHelper.SetKindProperty(activity, ActivityKind.Client);
if (activity.IsAllDataRequested)
{
var uri = this.uriFetcher.Fetch(payload);

var method = this.methodFetcher.Fetch(payload);
activity.DisplayName = this.GetDisplayName(activity, method);
if (uri == null)
{
ElasticsearchInstrumentationEventSource.Log.NullPayload(nameof(ElasticsearchRequestPipelineDiagnosticListener), nameof(this.OnStartActivity));
return;
}

if (this.options.SuppressDownstreamInstrumentation)
{
SuppressInstrumentationScope.Enter();
}
ActivityInstrumentationHelper.SetActivitySourceProperty(activity, ActivitySource);
ActivityInstrumentationHelper.SetKindProperty(activity, ActivityKind.Client);

if (!activity.IsAllDataRequested)
{
return;
}
var method = this.methodFetcher.Fetch(payload);

var elasticIndex = this.GetElasticIndex(uri);
activity.DisplayName = this.GetDisplayName(activity, method, elasticIndex);
activity.SetTag(SemanticConventions.AttributeDbSystem, DatabaseSystemName);
if (this.options.SuppressDownstreamInstrumentation)
{
SuppressInstrumentationScope.Enter();
}

if (elasticIndex != null)
{
activity.SetTag(SemanticConventions.AttributeDbName, elasticIndex);
}
var elasticIndex = this.GetElasticIndex(uri);
activity.DisplayName = this.GetDisplayName(activity, method, elasticIndex);
activity.SetTag(SemanticConventions.AttributeDbSystem, DatabaseSystemName);

var uriHostNameType = Uri.CheckHostName(uri.Host);
if (uriHostNameType == UriHostNameType.IPv4 || uriHostNameType == UriHostNameType.IPv6)
{
activity.SetTag(SemanticConventions.AttributeNetPeerIp, uri.Host);
}
else
{
activity.SetTag(SemanticConventions.AttributeNetPeerName, uri.Host);
}
if (elasticIndex != null)
{
activity.SetTag(SemanticConventions.AttributeDbName, elasticIndex);
}

if (uri.Port > 0)
{
activity.SetTag(SemanticConventions.AttributeNetPeerPort, uri.Port);
}
var uriHostNameType = Uri.CheckHostName(uri.Host);
if (uriHostNameType == UriHostNameType.IPv4 || uriHostNameType == UriHostNameType.IPv6)
{
activity.SetTag(SemanticConventions.AttributeNetPeerIp, uri.Host);
}
else
{
activity.SetTag(SemanticConventions.AttributeNetPeerName, uri.Host);
}

if (method != null)
{
activity.SetTag(AttributeDbMethod, method.ToString());
}
if (uri.Port > 0)
{
activity.SetTag(SemanticConventions.AttributeNetPeerPort, uri.Port);
}

activity.SetTag(SemanticConventions.AttributeDbUrl, uri.OriginalString);
if (method != null)
{
activity.SetTag(AttributeDbMethod, method.ToString());
}

try
{
this.options.Enrich?.Invoke(activity, "OnStartActivity", payload);
}
catch (Exception ex)
{
ElasticsearchInstrumentationEventSource.Log.EnrichmentException(ex);
activity.SetTag(SemanticConventions.AttributeDbUrl, uri.OriginalString);

try
{
this.options.Enrich?.Invoke(activity, "OnStartActivity", payload);
}
catch (Exception ex)
{
ElasticsearchInstrumentationEventSource.Log.EnrichmentException(ex);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<IncludeSharedInstrumentationSource>true</IncludeSharedInstrumentationSource>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="1.0.2-alpha.0.29" />
<PackageReference Include="OpenTelemetry" Version="1.1.0-beta1" />
<PackageReference Include="System.Text.Json" Version="5.0.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -729,5 +729,73 @@ public async Task CanCaptureCatRequest()

// Assert.Equal(expectedResource, searchActivity.GetResource());
}

[Fact]
public async Task DoesNotCaptureWhenInstrumentationIsSuppressed()
{
var expectedResource = ResourceBuilder.CreateDefault().AddService("test-service");
var processor = new Mock<BaseProcessor<Activity>>();

var parent = new Activity("parent").Start();

var client = new ElasticClient(new ConnectionSettings(new InMemoryConnection()).DefaultIndex("customer"));

using (Sdk.CreateTracerProviderBuilder()
.SetSampler(new AlwaysOnSampler())
.AddElasticsearchClientInstrumentation()
.SetResourceBuilder(expectedResource)
.AddProcessor(processor.Object)
.Build())
{
using var scope = SuppressInstrumentationScope.Begin();
var getResponse = await client.GetAsync<Customer>("123");
Assert.NotNull(getResponse);
Assert.True(getResponse.ApiCall.Success);
Assert.NotEmpty(getResponse.ApiCall.AuditTrail);

var failed = getResponse.ApiCall.AuditTrail.Where(a => a.Event == AuditEvent.BadResponse);
Assert.Empty(failed);
}

// Since instrumentation is suppressed, activity is not emitted
Assert.Equal(3, processor.Invocations.Count); // SetParentProvider + OnShutdown + Dispose

// Processor.OnStart and Processor.OnEnd are not called
Assert.DoesNotContain(processor.Invocations, invo => invo.Method.Name == nameof(processor.Object.OnStart));
Assert.DoesNotContain(processor.Invocations, invo => invo.Method.Name == nameof(processor.Object.OnEnd));
}

[Theory]
[InlineData(SamplingDecision.Drop, false)]
[InlineData(SamplingDecision.RecordOnly, true)]
[InlineData(SamplingDecision.RecordAndSample, true)]
public async Task CapturesBasedOnSamplingDecision(SamplingDecision samplingDecision, bool isActivityExpected)
{
var expectedResource = ResourceBuilder.CreateDefault().AddService("test-service");
var processor = new Mock<BaseProcessor<Activity>>();

var parent = new Activity("parent").Start();

var client = new ElasticClient(new ConnectionSettings(new InMemoryConnection()).DefaultIndex("customer"));

using (Sdk.CreateTracerProviderBuilder()
.SetSampler(new TestSampler() { SamplingAction = (samplingParameters) => new SamplingResult(samplingDecision) })
.AddElasticsearchClientInstrumentation()
.SetResourceBuilder(expectedResource)
.AddProcessor(processor.Object)
.Build())
{
var getResponse = await client.GetAsync<Customer>("123");
Assert.NotNull(getResponse);
Assert.True(getResponse.ApiCall.Success);
Assert.NotEmpty(getResponse.ApiCall.AuditTrail);

var failed = getResponse.ApiCall.AuditTrail.Where(a => a.Event == AuditEvent.BadResponse);
Assert.Empty(failed);
}

Assert.Equal(isActivityExpected, processor.Invocations.Any(invo => invo.Method.Name == nameof(processor.Object.OnStart)));
Assert.Equal(isActivityExpected, processor.Invocations.Any(invo => invo.Method.Name == nameof(processor.Object.OnEnd)));
}
}
}

0 comments on commit 8906aae

Please sign in to comment.