Skip to content

Commit

Permalink
Move 'new EnvConfigs()' towards the outer shell of the app for improv…
Browse files Browse the repository at this point in the history
…ed configuration control (#6925)

* Moving 'new EnvConfigs()' to the outer shell of the app for increased configuration control.
* Cleaned up singleton behavior to make initialization more predictable and enforced.
* Made the singleton a true singleton; enforced init() call; moved EnvConfigs to top level.
* Refactored to move EnvConfigs to top level, which meant putting port config into the factory.
* Modified KubePodProcess test to avoid reinitializing the factory and thus singleton's ports, each time.
* Solve test init issues based on what needs to be static.
Co-authored-by: Davin Chia <[email protected]>
  • Loading branch information
airbyte-jenny authored Oct 12, 2021
1 parent d5c0499 commit 378e8e0
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static Path getSchedulerLogsRoot(final Configs configs) {

public static File getServerLogFile(final Configs configs) {
final var logPathBase = getServerLogsRoot(configs);
if (shouldUseLocalLogs(configs)) {
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
return logPathBase.resolve(LOG_FILENAME).toFile();
}

Expand All @@ -85,7 +85,7 @@ public static File getServerLogFile(final Configs configs) {

public static File getSchedulerLogFile(final Configs configs) {
final var logPathBase = getSchedulerLogsRoot(configs);
if (shouldUseLocalLogs(configs)) {
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
return logPathBase.resolve(LOG_FILENAME).toFile();
}

Expand All @@ -103,7 +103,7 @@ public static List<String> getJobLogFile(final Configs configs, final Path logPa
return Collections.emptyList();
}

if (shouldUseLocalLogs(configs)) {
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
return IOs.getTail(LOG_TAIL_SIZE, logPath);
}

Expand All @@ -121,7 +121,7 @@ public static void deleteLogs(final Configs configs, final String logPath) {
return;
}

if (shouldUseLocalLogs(configs)) {
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
throw new NotImplementedException("Local log deletes not supported.");
}
final var logConfigs = new LogConfigDelegator(configs);
Expand All @@ -130,8 +130,10 @@ public static void deleteLogs(final Configs configs, final String logPath) {
}

public static void setJobMdc(final Path path) {
final var configs = new EnvConfigs();
if (shouldUseLocalLogs(configs)) {
// setJobMdc is referenced from TemporalAttemptExecution without input parameters, so hard to pass
// this in.
Configs configs = new EnvConfigs();
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
LOGGER.debug("Setting docker job mdc");
MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString());
} else {
Expand All @@ -144,7 +146,7 @@ public static void setJobMdc(final Path path) {

public static void setWorkspaceMdc(final Path path) {
final var configs = new EnvConfigs();
if (shouldUseLocalLogs(configs)) {
if (shouldUseLocalLogs(configs.getWorkerEnvironment())) {
LOGGER.debug("Setting docker workspace mdc");
MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString());
} else {
Expand All @@ -155,8 +157,8 @@ public static void setWorkspaceMdc(final Path path) {
}
}

private static boolean shouldUseLocalLogs(final Configs configs) {
return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER);
private static boolean shouldUseLocalLogs(WorkerEnvironment workerEnvironment) {
return workerEnvironment.equals(WorkerEnvironment.DOCKER);
}

private static void createCloudClientIfNull(final LogConfigs configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.metrics;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.EnvConfigs;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
Expand All @@ -21,33 +20,47 @@
/**
* Use the prometheus library to publish prometheus metrics to a specified port. These metrics can
* be consumed by any agent understanding the OpenMetrics format.
*
* <p>
* This class mainly exists to help Airbyte instrument/debug application on Airbyte Cloud. Within
* Airbyte Cloud, the metrics are consumed by a Datadog agent and transformed into Datadog metrics
* as per https://docs.datadoghq.com/integrations/guide/prometheus-metrics/.
*
* <p>
* Open source users are free to turn this on and consume the same metrics.
*/
public class MetricSingleton {

private static final Logger LOGGER = LoggerFactory.getLogger(MetricSingleton.class);
private static boolean PUBLISH = new EnvConfigs().getPublishMetrics();
private static MetricSingleton instance;

private static final Map<String, Gauge> nameToGauge = new HashMap<>();
private static final Map<String, Counter> nameToCounter = new HashMap<>();
private static final Map<String, Histogram> nameToHistogram = new HashMap<>();
private final Map<String, Gauge> nameToGauge = new HashMap<>();
private final Map<String, Counter> nameToCounter = new HashMap<>();
private final Map<String, Histogram> nameToHistogram = new HashMap<>();

private static HTTPServer monitoringDaemon;
private HTTPServer monitoringDaemon;

private MetricSingleton() {}

public static synchronized MetricSingleton getInstance() {
if (instance == null) {
throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance.");
}
return instance;
}

public void setMonitoringDaemon(HTTPServer monitoringDaemon) {
this.monitoringDaemon = monitoringDaemon;
}

// Gauge. See
// https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=gauge#monotonic-count.

/**
* Track value at a given timestamp.
*
* @param name of gauge
* @param val to set
*/
public static void setGauge(String name, double val, String description) {
public void setGauge(String name, double val, String description) {
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
if (!nameToGauge.containsKey(name)) {
Gauge gauge = Gauge.build().name(name).help(description).register();
Expand All @@ -63,7 +76,7 @@ public static void setGauge(String name, double val, String description) {
* @param name of gauge
* @param val to increment
*/
public static void incrementGauge(String name, double val, String description) {
public void incrementGauge(String name, double val, String description) {
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
if (nameToGauge.containsKey(name)) {
LOGGER.warn("Overriding existing metric, type: Gauge, name: {}", name);
Expand All @@ -83,7 +96,7 @@ public static void incrementGauge(String name, double val, String description) {
* @param name of gauge
* @param val to decrement
*/
public static void decrementGauge(String name, double val, String description) {
public void decrementGauge(String name, double val, String description) {
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
if (!nameToGauge.containsKey(name)) {
Gauge gauge = Gauge.build().name(name).help(description).register();
Expand All @@ -95,13 +108,14 @@ public static void decrementGauge(String name, double val, String description) {

// Counter - Monotonically Increasing. See
// https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=count#monotonic-count.

/**
* Increment a monotoically increasing counter.
* Increment a monotonically increasing counter.
*
* @param name of counter
* @param amt to increment
*/
public static void incrementCounter(String name, double amt, String description) {
public void incrementCounter(String name, double amt, String description) {
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
if (!nameToCounter.containsKey(name)) {
Counter counter = Counter.build().name(name).help(description).register();
Expand All @@ -114,14 +128,15 @@ public static void incrementCounter(String name, double amt, String description)

// Histogram. See
// https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=histogram#monotonic-count.

/**
* Time code execution.
*
* @param name of histogram
* @param runnable to time
* @return duration of code execution.
*/
public static double timeCode(String name, Runnable runnable, String description) {
public double timeCode(String name, Runnable runnable, String description) {
var duration = new AtomicReference<>(0.0);
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
if (!nameToHistogram.containsKey(name)) {
Expand All @@ -139,7 +154,7 @@ public static double timeCode(String name, Runnable runnable, String description
* @param name of the underlying histogram.
* @param time to be recorded.
*/
public static void recordTime(String name, double time, String description) {
public void recordTime(String name, double time, String description) {
validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> {
LOGGER.info("publishing record time, name: {}, time: {}", name, time);

Expand All @@ -151,8 +166,8 @@ public static void recordTime(String name, double time, String description) {
}));
}

private static void ifPublish(Runnable execute) {
if (PUBLISH) {
private void ifPublish(Runnable execute) {
if (monitoringDaemon != null) {
execute.run();
}
}
Expand All @@ -168,33 +183,34 @@ private static void validateNameAndCheckDescriptionExists(String name, String de
}

/**
* Stand up a separate thread to publish metrics to the specified port.
* Stand up a separate thread to publish metrics to the specified port. This method (in lieu of a
* constructor) must be called ahead of recording time, in order to set up the monitoring daemon and
* initialize the isPublish() configuration as true/false.
*
* @param monitorPort to publish metrics to
*/
public static void initializeMonitoringServiceDaemon(String monitorPort, Map<String, String> mdc) {
ifPublish(() -> {
public synchronized static void initializeMonitoringServiceDaemon(String monitorPort, Map<String, String> mdc, boolean publish) {
if (instance != null) {
throw new RuntimeException("You cannot initialize configuration more than once.");
}
instance = new MetricSingleton();
if (publish) {
try {
MDC.setContextMap(mdc);
LOGGER.info("Starting prometheus metric server..");
// The second constructor argument ('true') makes this server start as a separate daemon thread.
// http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean-
monitoringDaemon = new HTTPServer(Integer.parseInt(monitorPort), true);
instance.setMonitoringDaemon(new HTTPServer(Integer.parseInt(monitorPort), true));
} catch (IOException e) {
LOGGER.error("Error starting up Prometheus publishing server..", e);
}
});
}
}

@VisibleForTesting
public static void closeMonitoringServiceDaemon() {
public void closeMonitoringServiceDaemon() {
monitoringDaemon.close();
LOGGER.info("Stopping monitoring daemon..");
}

@VisibleForTesting
public static void setToPublish() {
PUBLISH = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,25 @@ public static void setUp() throws IOException {
availPort = socket.getLocalPort();
}

MetricSingleton.setToPublish();
MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of());
MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true);
}

@AfterAll
public static void tearDown() {
MetricSingleton.closeMonitoringServiceDaemon();
MetricSingleton.getInstance().closeMonitoringServiceDaemon();
}

@Nested
class Validation {

@Test
public void testNameWithDashFails() {
assertThrows(RuntimeException.class, () -> MetricSingleton.incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed"));
}

@Test
public void testNoDescriptionFails() {
assertThrows(RuntimeException.class, () -> MetricSingleton.incrementCounter("good_name", 0.0, null));
assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("good_name", 0.0, null));
}

}
Expand All @@ -70,7 +69,7 @@ public void testCounter() throws InterruptedException, IOException {
var metricName = "test_counter";
var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter");
Thread.sleep(500);
}

Expand All @@ -83,7 +82,7 @@ public void testGauge() throws InterruptedException, IOException {
var metricName = "test_gauge";
var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge");
Thread.sleep(500);
}

Expand All @@ -96,7 +95,7 @@ public void testTimer() throws InterruptedException, IOException {
var metricName = "test_timer";
var rand = new Random();
for (int i = 0; i < 5; i++) {
MetricSingleton.recordTime(metricName, rand.nextDouble() * 2, "testing time");
MetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time");
Thread.sleep(500);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.scheduler.app;

import io.airbyte.config.EnvConfigs;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobNotifier;
Expand All @@ -21,17 +20,18 @@
public class JobRetrier implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(JobRetrier.class);
private static final int MAX_SYNC_JOB_ATTEMPTS = new EnvConfigs().getMaxSyncJobAttempts();;
private static final int RETRY_WAIT_MINUTES = 1;

private final JobPersistence persistence;
private final Supplier<Instant> timeSupplier;
private final JobNotifier jobNotifier;
private final int maxSyncJobAttempts;

public JobRetrier(JobPersistence jobPersistence, Supplier<Instant> timeSupplier, JobNotifier jobNotifier) {
public JobRetrier(JobPersistence jobPersistence, Supplier<Instant> timeSupplier, JobNotifier jobNotifier, int maxSyncJobAttempts) {
this.persistence = jobPersistence;
this.timeSupplier = timeSupplier;
this.jobNotifier = jobNotifier;
this.maxSyncJobAttempts = maxSyncJobAttempts;
}

@Override
Expand Down Expand Up @@ -75,7 +75,7 @@ private List<Job> incompleteJobs() {

private boolean hasReachedMaxAttempt(Job job) {
if (Job.REPLICATION_TYPES.contains(job.getConfigType())) {
return job.getAttemptsCount() >= MAX_SYNC_JOB_ATTEMPTS;
return job.getAttemptsCount() >= maxSyncJobAttempts;
} else {
return job.getAttemptsCount() >= 1;
}
Expand Down
Loading

0 comments on commit 378e8e0

Please sign in to comment.