Skip to content

Commit

Permalink
Add links on batch send operation in eventhubs (#6773)
Browse files Browse the repository at this point in the history
  • Loading branch information
samvaity authored Dec 16, 2019
1 parent b52329a commit 7bec67b
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ public Context extractContext(String diagnosticId, Context context) {
return local;
}

/**
* For each tracer plugged into the SDK a new context containing the span builder is returned.
*
* @param context Additional metadata containing the span name for creating the span builer.
*/
public Context getSharedSpanBuilder(Context context) {
Context local = Objects.requireNonNull(context, "'context' cannot be null.");
String spanName = getSpanName(ProcessKind.SEND);
for (Tracer tracer : tracers) {
local = tracer.getSharedSpanBuilder(spanName, local);
}
return local;
}

private void end(String statusMessage, Throwable throwable, Context context) {
for (Tracer tracer : tracers) {
tracer.end(statusMessage, throwable, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -194,4 +195,40 @@ public void addSpanLinks() {
// Act
assertThrows(NullPointerException.class, () -> tracerProvider.addSpanLinks(null));
}

@Test
public void getSpanBuilderReturnsUpdatedContext() {
// Arrange
final String spanBuilderKey = "spanBuilder-key";
final String spanBuilderValue = "spanBuilder-value";

final String spanBuilderKey1 = "spanBuilder-key1";
final String spanBuilderValue1 = "spanBuilder-value1";
final Context startingContext = Context.NONE;

when(tracer.getSharedSpanBuilder(anyString(), any())).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
return passed.addData(spanBuilderKey, spanBuilderValue);
}
);
when(tracer2.getSharedSpanBuilder(anyString(), any())).thenAnswer(
invocation -> {
Context passed = invocation.getArgument(1, Context.class);
return passed.addData(spanBuilderKey1, spanBuilderValue1);
}
);

// Act
final Context updatedContext = tracerProvider.getSharedSpanBuilder(startingContext);

// Assert
final Optional<Object> spanBuilderData = updatedContext.getData(spanBuilderKey);
Assertions.assertTrue(spanBuilderData.isPresent());
Assertions.assertEquals(spanBuilderValue, spanBuilderData.get());

final Optional<Object> spanBuilderData1 = updatedContext.getData(spanBuilderKey1);
Assertions.assertTrue(spanBuilderData1.isPresent());
Assertions.assertEquals(spanBuilderValue1, spanBuilderData1.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public Context extractContext(String diagnosticId, Context context) {
return AmqpPropagationFormatUtil.extractContext(diagnosticId, context);
}

@Override
public Context getSharedSpanBuilder(String spanName, Context context) {
throw logger.logExceptionAsError(
new UnsupportedOperationException("This operation is not supported in OpenCensus"));
// Remove OpenCensus support for tracing in https://github.com/Azure/azure-sdk-for-java/issues/6781
}

/**
* Starts a new child {@link Span} with parent being the remote and uses the {@link Span} is in the current Context,
* to return an object that represents that scope.
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core-tracing-opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Release History
## Version 1.0.0-beta.2 (2020-01-03)
- Add eventhub properties to attributes of processing spans.
- Add `EventHubs.*` properties to attributes of processing spans.
- Remove `Azure` prefix from convenience layer span names.
- Add links for batch send operation in Event Hubs client library.

This package's
[documentation](https://github.com/Azure/azure-sdk-for-java/blob/azure-core-tracing-opentelemetry_1.0.0-beta.2/sdk/core/azure-core-tracing-opentelemetry/README.md)
Expand All @@ -19,4 +20,3 @@ This package's
[documentation](https://github.com/Azure/azure-sdk-for-java/blob/azure-core-tracing-opentelemetry_1.0.0-beta.1/sdk/core/azure-core-tracing-opentelemetry/README.md)
and
[samples](https://github.com/Azure/azure-sdk-for-java/blob/azure-core-tracing-opentelemetry_1.0.0-beta.1/sdk/core/azure-core-tracing-opentelemetry/src/samples).

Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ public Context start(String spanName, Context context, ProcessKind processKind)

switch (processKind) {
case SEND:
spanBuilder = getSpanBuilder(spanName, context);
// use previously created span builder from the LINK process.
spanBuilder = getOrDefault(context, SPAN_BUILDER_KEY, null, Builder.class);
if (spanBuilder == null) {
return Context.NONE;
}
span = spanBuilder.setSpanKind(Span.Kind.PRODUCER).startSpan();
if (span.isRecording()) {
// If span is sampled in, add additional request attributes
Expand Down Expand Up @@ -172,6 +176,11 @@ public Context extractContext(String diagnosticId, Context context) {
return AmqpPropagationFormatUtil.extractContext(diagnosticId, context);
}

@Override
public Context getSharedSpanBuilder(String spanName, Context context) {
return context.addData(SPAN_BUILDER_KEY, getSpanBuilder(spanName, context));
}

/**
* Starts a new child {@link Span} with parent being the remote and uses the {@link Span} is in the current Context,
* to return an object that represents that scope.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class Sample {
private static final Logger LOGGER = getLogger("Sample");
private static final Tracer TRACER;
private static final TracerSdkFactory TRACER_SDK_FACTORY;
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);

static {
TRACER_SDK_FACTORY = configureOpenTelemetryAndLoggingExporter();
Expand All @@ -80,35 +81,66 @@ public class Sample {
}

private static void doClientWork() {
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
.buildProducerClient();
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(CONNECTION_STRING)
.buildAsyncProducerClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
try (final Scope scope = TRACER.withSpan(span)) {
final EventData event1 = new EventData("1".getBytes(UTF_8));
event1.addContext(PARENT_SPAN_KEY, span);

final EventData event2 = new EventData("2".getBytes(UTF_8));
event2.addContext(PARENT_SPAN_KEY, span);
String firstPartition = producer.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

final byte[] body = "EventData Sample 1".getBytes(UTF_8);
final byte[] body2 = "EventData Sample 2".getBytes(UTF_8);

final List<EventData> telemetryEvents = Arrays.asList(event1, event2);
// We will publish three events based on simple sentences.
Flux<EventData> data = Flux.just(
new EventData(body).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()),
new EventData(body2).addContext(PARENT_SPAN_KEY, TRACER.getCurrentSpan()));

// Create a batch to send the events.
final CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("telemetry")
.setPartitionId(firstPartition)
.setMaximumSizeInBytes(256);
EventDataBatch currentBatch = producer.createBatch(options);

// For each telemetry event, we try to add it to the current batch.
for (EventData event : telemetryEvents) {
if (!currentBatch.tryAdd(event)) {
producer.send(currentBatch);
currentBatch = producer.createBatch(options);

final AtomicReference<EventDataBatch> currentBatch = new AtomicReference<>(
producer.createBatch(options).block(OPERATION_TIMEOUT));

data.flatMap(event -> {
final EventDataBatch batch = currentBatch.get();
if (batch.tryAdd(event)) {
return Mono.empty();
}
}
} finally {
span.end();
producer.close();

// The batch is full, so we create a new batch and send the batch. Mono.when completes when both operations
// have completed.
return Mono.when(
producer.send(batch),
producer.createBatch(options).map(newBatch -> {
currentBatch.set(newBatch);

// Add that event that we couldn't before.
if (!newBatch.tryAdd(event)) {
throw Exceptions.propagate(new IllegalArgumentException(String.format(
"Event is too large for an empty batch. Max size: %s. Event: %s",
newBatch.getMaxSizeInBytes(), event.getBodyAsString())));
}

return newBatch;
}));
}).then()
.doFinally(signal -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
producer.send(batch).block(OPERATION_TIMEOUT);
}
})
.subscribe(unused -> System.out.println("Complete"),
error -> System.out.println("Error sending events: " + error),
() -> {
span.end();
producer.close();
System.out.println("Completed sending events.");
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void startSpanParentContextFlowTest() {
final ReadableSpan recordEventsSpan =
(ReadableSpan) updatedContext.getData(PARENT_SPAN_KEY).get();
assertEquals(Span.Kind.INTERNAL, recordEventsSpan.toSpanData().getKind());
assertEquals(true, true);
}

@Test
Expand All @@ -117,9 +116,11 @@ public void startSpanTestNoUserParent() {
public void startSpanProcessKindSend() {
// Arrange
final SpanId parentSpanId = parentSpan.getContext().getSpanId();
// Start user parent span.
final Span.Builder spanBuilder = tracer.spanBuilder(METHOD_NAME);
// Add additional metadata to spans for SEND
final Context traceContext = tracingContext.addData(ENTITY_PATH_KEY, ENTITY_PATH_VALUE)
.addData(HOST_NAME_KEY, HOSTNAME_VALUE);
.addData(HOST_NAME_KEY, HOSTNAME_VALUE).addData(SPAN_BUILDER_KEY, spanBuilder);

// Act
final Context updatedContext = openTelemetryTracer.start(METHOD_NAME, traceContext, ProcessKind.SEND);
Expand Down Expand Up @@ -181,6 +182,14 @@ public void startSpanProcessKindProcess() {
verifySpanAttributes(attributeMap);
}

@Test
public void getSpanBuilderTest() {
// Act
final Context updatedContext = openTelemetryTracer.getSharedSpanBuilder(METHOD_NAME, Context.NONE);

assertTrue(updatedContext.getData(SPAN_BUILDER_KEY).isPresent());
}

@Test
public void startProcessSpanWithRemoteParent() {
// Arrange
Expand Down Expand Up @@ -277,7 +286,7 @@ public void endSpanTestThrowableResponseCode() {
}

private static void assertSpanWithExplicitParent(Context updatedContext, SpanId parentSpanId) {
assertNotNull(updatedContext.getData(PARENT_SPAN_KEY));
assertNotNull(updatedContext.getData(PARENT_SPAN_KEY).get());

// verify instance created of opentelemetry-sdk (test impl), span implementation
assertTrue(updatedContext.getData(PARENT_SPAN_KEY).get() instanceof ReadableSpan);
Expand All @@ -292,7 +301,7 @@ private static void assertSpanWithExplicitParent(Context updatedContext, SpanId
}

private static void assertSpanWithRemoteParent(Context updatedContext, SpanId parentSpanId) {
assertNotNull(updatedContext.getData(PARENT_SPAN_KEY));
assertNotNull(updatedContext.getData(PARENT_SPAN_KEY).get());

// verify instance created of opentelemetry-sdk (test impl), span implementation
assertTrue(updatedContext.getData(PARENT_SPAN_KEY).get() instanceof ReadableSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,23 @@ public interface Tracer {
* @throws NullPointerException if {@code diagnosticId} or {@code context} is {@code null}.
*/
Context extractContext(String diagnosticId, Context context);

/**
* Returns a span builder with the provided name in {@link Context}.
*
* <p><strong>Code samples</strong></p>
*
* <p>Returns a builder with the provided span name.</p>
* {@codesnippet com.azure.core.util.tracing.getSharedSpanBuilder#string-context}
*
* @param spanName Name to give the span for the created builder.
* @param context Additional metadata that is passed through the call stack.
*
* @return The updated {@link Context} object containing the span builder.
* @throws NullPointerException if {@code context} or {@code spanName} is {@code null}.
*/
default Context getSharedSpanBuilder(String spanName, Context context) {
// no-op
return Context.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_BUILDER_KEY;

/**
* Contains code snippets when generating javadocs through doclets for {@link Tracer}.
Expand Down Expand Up @@ -120,6 +121,18 @@ public void extractContext() {
// END: com.azure.core.util.tracing.extractContext#string-context
}

/**
* Code snippet for {@link Tracer#getSharedSpanBuilder(String, Context)}
*/
public void getSharedSpanBuilder() {
// BEGIN: com.azure.core.util.tracing.getSpanBuilder#string-context
// Returns a span builder with the provided name
String methodName = "message-span";
Context spanContext = tracer.getSharedSpanBuilder(methodName, Context.NONE);
System.out.printf("Span context of the current tracing span: %s%n", spanContext.getData(SPAN_BUILDER_KEY).get());
// END: com.azure.core.util.tracing.getSpanBuilder#string-context
}

//Noop Tracer
private static final class TracerImplementation implements Tracer {
@Override
Expand Down Expand Up @@ -161,5 +174,10 @@ public void addLink(Context context) {
public Context extractContext(String diagnosticId, Context context) {
return null;
}

@Override
public Context getSharedSpanBuilder(String spanName, Context context) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ Context getContext() {
* @throws NullPointerException if {@code key} or {@code value} is null.
* @return The updated {@link EventData}.
*/
EventData addContext(String key, Object value) {
public EventData addContext(String key, Object value) {
Objects.requireNonNull(key, "The 'key' parameter cannot be null.");
Objects.requireNonNull(value, "The 'value' parameter cannot be null.");
this.context = context.addData(key, value);
Expand Down
Loading

0 comments on commit 7bec67b

Please sign in to comment.