Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export temporal metrics to datadog #14842

Merged
merged 25 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e31eb19
add error code to ManualOperationResult
xiaohansong Jul 12, 2022
ff9b519
fix a bug
xiaohansong Jul 13, 2022
f26a0b2
support temporal metrics
xiaohansong Jul 14, 2022
f872225
metrics in temporal
xiaohansong Jul 14, 2022
4e651ed
use statsd
xiaohansong Jul 14, 2022
f56233a
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 15, 2022
586aad9
wrap otel config to temporal metric export
xiaohansong Jul 15, 2022
5bf4e67
use http port 4318 for otlp exporter
xiaohansong Jul 18, 2022
33a18f5
simpilfy to support dd only
xiaohansong Jul 18, 2022
85a56d2
use /v1/metrics for endpoint
xiaohansong Jul 18, 2022
43bee0c
use statsd
xiaohansong Jul 19, 2022
1056890
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 19, 2022
1a113d8
fix
xiaohansong Jul 19, 2022
413cbce
remove unused func
xiaohansong Jul 19, 2022
e29815a
wrap up implementation to export temporal metrics to datadog
xiaohansong Jul 19, 2022
0453ac7
use deps.toml to wrap up the dependency
xiaohansong Jul 19, 2022
9d5b795
Merge branch 'master' into xiaohan/goofing
xiaohansong Jul 19, 2022
3666dd5
move to metric client factory
xiaohansong Jul 27, 2022
1eb22dc
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 27, 2022
ac97b12
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Jul 28, 2022
b1d1984
fix pmd error
xiaohansong Jul 28, 2022
acf58da
pmd, comment fix
xiaohansong Jul 28, 2022
1b7fbf4
pr comment fix
xiaohansong Aug 1, 2022
12a422b
Merge remote-tracking branch 'origin/master' into xiaohan/goofing
xiaohansong Aug 1, 2022
066cf5b
Merge branch 'master' into xiaohan/goofing
xiaohansong Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-metrics/metrics-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation libs.otel.semconv
implementation libs.otel.sdk
implementation libs.otel.sdk.testing
implementation libs.micrometer.statsd
implementation platform(libs.otel.bom)
implementation("io.opentelemetry:opentelemetry-api")
implementation("io.opentelemetry:opentelemetry-sdk")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.statsd.StatsdConfig;
import io.micrometer.statsd.StatsdMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -16,7 +20,7 @@ public class MetricClientFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricClientFactory.class);

private static final String DATADOG_METRIC_CLIENT = "datadog";
static final String DATADOG_METRIC_CLIENT = "datadog";
private static final String OTEL_METRIC_CLIENT = "otel";

private static final Configs configs = new EnvConfigs();
Expand Down Expand Up @@ -67,6 +71,47 @@ public static synchronized void initialize(final MetricEmittingApp metricEmittin
}
}

private static StatsdConfig getDatadogStatsDConfig() {
return new StatsdConfig() {

/**
* @return
*/
@Override
public String host() {
return configs.getDDAgentHost();
}

/**
* @param key Key to lookup in the config.
* @return
*/
@Override
public String get(String key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this returning null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the default settings.

Doesn't mean anything - unless we want to inject customized configs in it, but in this case temporal side wouldn't consume anything anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Can we clean up the javadocs here to make it clear what the methods are used for? e.g. make it clear the get method is supposed to be stubbed out, and the host method is hardcoded.

return null;
}

};
}

/**
*
* Returns a meter registry to be consumed by temporal configs.
*
*/
public static MeterRegistry getMeterRegistry() {

if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) {
StatsdConfig config = getDatadogStatsDConfig();
return new StatsdMeterRegistry(config, Clock.SYSTEM);
}

// To support open telemetry, we need to use a different type of Config. For now we simply return
// null - in this case, we do not register any metric emitting mechanism in temporal and thus
// users will not receive temporal related metrics.
return null;
xiaohansong marked this conversation as resolved.
Show resolved Hide resolved
}

private static DogStatsDMetricClient initializeDatadogMetricClient(
final MetricEmittingApp metricEmittingApp) {
final DogStatsDMetricClient client = new DogStatsDMetricClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -43,4 +44,10 @@ void testMetricClientFactoryCreateMultipleTimesThrows() {
});
}

@Test
@DisplayName("Should not return null if metric client not specified;")
void testMicroMeterRegistryRuturnsNullForEmptyClientConfig() {
assertNull(MetricClientFactory.getMeterRegistry());
}

}
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'org.eclipse.jetty:jetty-servlet:9.4.31.v20200723'
implementation 'org.quartz-scheduler:quartz:2.3.2'
implementation libs.flyway.core
implementation libs.micrometer.statsd

implementation project(':airbyte-analytics')
implementation project(':airbyte-commons-docker')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ private void registerSync(final WorkerFactory factory) {
final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(statePersistence, featureFlags);

final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));

syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class);
syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity);

}

private void registerDiscover(final WorkerFactory factory) {
Expand Down Expand Up @@ -427,6 +429,7 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
new OAuthConfigSupplier(configRepository, trackingClient));

final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService();

final WorkflowClient workflowClient = TemporalUtils.createWorkflowClient(temporalService, TemporalUtils.getNamespace());
final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
package io.airbyte.workers.temporal;

import com.google.common.annotations.VisibleForTesting;
import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.StatsReporter;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.scheduler.models.JobRunConfig;
import io.micrometer.core.instrument.MeterRegistry;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.namespace.v1.NamespaceConfig;
Expand All @@ -21,6 +26,7 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import io.temporal.common.reporter.MicrometerClientStatsReporter;
import io.temporal.serviceclient.SimpleSslContextBuilder;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
Expand Down Expand Up @@ -90,15 +96,25 @@ public static WorkflowServiceStubs createTemporalService() {
private static WorkflowServiceStubsOptions getCloudTemporalOptions() {
final InputStream clientCert = new ByteArrayInputStream(configs.getTemporalCloudClientCert().getBytes(StandardCharsets.UTF_8));
final InputStream clientKey = new ByteArrayInputStream(configs.getTemporalCloudClientKey().getBytes(StandardCharsets.UTF_8));
WorkflowServiceStubsOptions.Builder optionBuilder;
try {
return WorkflowServiceStubsOptions.newBuilder()
optionBuilder = WorkflowServiceStubsOptions.newBuilder()
.setSslContext(SimpleSslContextBuilder.forPKCS8(clientCert, clientKey).build())
.setTarget(configs.getTemporalCloudHost())
.build();
.setTarget(configs.getTemporalCloudHost());
} catch (final SSLException e) {
log.error("SSL Exception occurred attempting to establish Temporal Cloud options.");
throw new RuntimeException(e);
}

MeterRegistry registry = MetricClientFactory.getMeterRegistry();
if (registry != null) {
StatsReporter reporter = new MicrometerClientStatsReporter(registry);
Scope scope = new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofSeconds(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be a constant

optionBuilder.setMetricsScope(scope);
}
return optionBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for code readability, i think this should be moved into a private method called configureTemporalMetricEmission. In this way, any reader will immediately know what this is for. What do you think?

}

@VisibleForTesting
Expand Down
2 changes: 2 additions & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ otel-bom = {module = "io.opentelemetry:opentelemetry-bom", version = "1.14.0"}
otel-semconv = {module = "io.opentelemetry:opentelemetry-semconv", version = "1.14.0-alpha"}
otel-sdk = {module = "io.opentelemetry:opentelemetry-sdk-metrics", version = "1.14.0"}
otel-sdk-testing = {module = "io.opentelemetry:opentelemetry-sdk-metrics-testing", version = "1.13.0-alpha"}
micrometer-statsd = {module = "io.micrometer:micrometer-registry-statsd", version = "1.9.2"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good use of this pattern


quartz-scheduler = {module="org.quartz-scheduler:quartz", version = "2.3.2"}

[bundles]
Expand Down