diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java index dc6c708bc31b..9bbb9b3257e1 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/helpers/LogClientSingleton.java @@ -70,7 +70,7 @@ public static Path getSchedulerLogsRoot(final Configs configs) { public static File getServerLogFile(final Configs configs) { final var logPathBase = getServerLogsRoot(configs); - if (shouldUseLocalLogs(configs)) { + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { return logPathBase.resolve(LOG_FILENAME).toFile(); } @@ -85,7 +85,7 @@ public static File getServerLogFile(final Configs configs) { public static File getSchedulerLogFile(final Configs configs) { final var logPathBase = getSchedulerLogsRoot(configs); - if (shouldUseLocalLogs(configs)) { + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { return logPathBase.resolve(LOG_FILENAME).toFile(); } @@ -103,7 +103,7 @@ public static List getJobLogFile(final Configs configs, final Path logPa return Collections.emptyList(); } - if (shouldUseLocalLogs(configs)) { + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { return IOs.getTail(LOG_TAIL_SIZE, logPath); } @@ -121,7 +121,7 @@ public static void deleteLogs(final Configs configs, final String logPath) { return; } - if (shouldUseLocalLogs(configs)) { + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { throw new NotImplementedException("Local log deletes not supported."); } final var logConfigs = new LogConfigDelegator(configs); @@ -130,8 +130,10 @@ public static void deleteLogs(final Configs configs, final String logPath) { } public static void setJobMdc(final Path path) { - final var configs = new EnvConfigs(); - if (shouldUseLocalLogs(configs)) { + // setJobMdc is referenced from TemporalAttemptExecution without input parameters, so hard to pass + // this in. + Configs configs = new EnvConfigs(); + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { LOGGER.debug("Setting docker job mdc"); MDC.put(LogClientSingleton.JOB_LOG_PATH_MDC_KEY, path.resolve(LogClientSingleton.LOG_FILENAME).toString()); } else { @@ -144,7 +146,7 @@ public static void setJobMdc(final Path path) { public static void setWorkspaceMdc(final Path path) { final var configs = new EnvConfigs(); - if (shouldUseLocalLogs(configs)) { + if (shouldUseLocalLogs(configs.getWorkerEnvironment())) { LOGGER.debug("Setting docker workspace mdc"); MDC.put(LogClientSingleton.WORKSPACE_MDC_KEY, path.toString()); } else { @@ -155,8 +157,8 @@ public static void setWorkspaceMdc(final Path path) { } } - private static boolean shouldUseLocalLogs(final Configs configs) { - return configs.getWorkerEnvironment().equals(WorkerEnvironment.DOCKER); + private static boolean shouldUseLocalLogs(WorkerEnvironment workerEnvironment) { + return workerEnvironment.equals(WorkerEnvironment.DOCKER); } private static void createCloudClientIfNull(final LogConfigs configs) { diff --git a/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java b/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java index 0e796518cb68..0c4cbf9252f0 100644 --- a/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java +++ b/airbyte-metrics/src/main/java/io/airbyte/metrics/MetricSingleton.java @@ -5,7 +5,6 @@ package io.airbyte.metrics; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.config.EnvConfigs; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; @@ -21,33 +20,47 @@ /** * Use the prometheus library to publish prometheus metrics to a specified port. These metrics can * be consumed by any agent understanding the OpenMetrics format. - * + *

* This class mainly exists to help Airbyte instrument/debug application on Airbyte Cloud. Within * Airbyte Cloud, the metrics are consumed by a Datadog agent and transformed into Datadog metrics * as per https://docs.datadoghq.com/integrations/guide/prometheus-metrics/. - * + *

* Open source users are free to turn this on and consume the same metrics. */ public class MetricSingleton { private static final Logger LOGGER = LoggerFactory.getLogger(MetricSingleton.class); - private static boolean PUBLISH = new EnvConfigs().getPublishMetrics(); + private static MetricSingleton instance; - private static final Map nameToGauge = new HashMap<>(); - private static final Map nameToCounter = new HashMap<>(); - private static final Map nameToHistogram = new HashMap<>(); + private final Map nameToGauge = new HashMap<>(); + private final Map nameToCounter = new HashMap<>(); + private final Map nameToHistogram = new HashMap<>(); - private static HTTPServer monitoringDaemon; + private HTTPServer monitoringDaemon; + + private MetricSingleton() {} + + public static synchronized MetricSingleton getInstance() { + if (instance == null) { + throw new RuntimeException("You must initialize configuration with the initializeMonitoringServiceDaemon() method before getting an instance."); + } + return instance; + } + + public void setMonitoringDaemon(HTTPServer monitoringDaemon) { + this.monitoringDaemon = monitoringDaemon; + } // Gauge. See // https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=gauge#monotonic-count. + /** * Track value at a given timestamp. * * @param name of gauge * @param val to set */ - public static void setGauge(String name, double val, String description) { + public void setGauge(String name, double val, String description) { validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { if (!nameToGauge.containsKey(name)) { Gauge gauge = Gauge.build().name(name).help(description).register(); @@ -63,7 +76,7 @@ public static void setGauge(String name, double val, String description) { * @param name of gauge * @param val to increment */ - public static void incrementGauge(String name, double val, String description) { + public void incrementGauge(String name, double val, String description) { validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { if (nameToGauge.containsKey(name)) { LOGGER.warn("Overriding existing metric, type: Gauge, name: {}", name); @@ -83,7 +96,7 @@ public static void incrementGauge(String name, double val, String description) { * @param name of gauge * @param val to decrement */ - public static void decrementGauge(String name, double val, String description) { + public void decrementGauge(String name, double val, String description) { validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { if (!nameToGauge.containsKey(name)) { Gauge gauge = Gauge.build().name(name).help(description).register(); @@ -95,13 +108,14 @@ public static void decrementGauge(String name, double val, String description) { // Counter - Monotonically Increasing. See // https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=count#monotonic-count. + /** - * Increment a monotoically increasing counter. + * Increment a monotonically increasing counter. * * @param name of counter * @param amt to increment */ - public static void incrementCounter(String name, double amt, String description) { + public void incrementCounter(String name, double amt, String description) { validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { if (!nameToCounter.containsKey(name)) { Counter counter = Counter.build().name(name).help(description).register(); @@ -114,6 +128,7 @@ public static void incrementCounter(String name, double amt, String description) // Histogram. See // https://docs.datadoghq.com/metrics/agent_metrics_submission/?tab=histogram#monotonic-count. + /** * Time code execution. * @@ -121,7 +136,7 @@ public static void incrementCounter(String name, double amt, String description) * @param runnable to time * @return duration of code execution. */ - public static double timeCode(String name, Runnable runnable, String description) { + public double timeCode(String name, Runnable runnable, String description) { var duration = new AtomicReference<>(0.0); validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { if (!nameToHistogram.containsKey(name)) { @@ -139,7 +154,7 @@ public static double timeCode(String name, Runnable runnable, String description * @param name of the underlying histogram. * @param time to be recorded. */ - public static void recordTime(String name, double time, String description) { + public void recordTime(String name, double time, String description) { validateNameAndCheckDescriptionExists(name, description, () -> ifPublish(() -> { LOGGER.info("publishing record time, name: {}, time: {}", name, time); @@ -151,8 +166,8 @@ public static void recordTime(String name, double time, String description) { })); } - private static void ifPublish(Runnable execute) { - if (PUBLISH) { + private void ifPublish(Runnable execute) { + if (monitoringDaemon != null) { execute.run(); } } @@ -168,33 +183,34 @@ private static void validateNameAndCheckDescriptionExists(String name, String de } /** - * Stand up a separate thread to publish metrics to the specified port. + * Stand up a separate thread to publish metrics to the specified port. This method (in lieu of a + * constructor) must be called ahead of recording time, in order to set up the monitoring daemon and + * initialize the isPublish() configuration as true/false. * * @param monitorPort to publish metrics to */ - public static void initializeMonitoringServiceDaemon(String monitorPort, Map mdc) { - ifPublish(() -> { + public synchronized static void initializeMonitoringServiceDaemon(String monitorPort, Map mdc, boolean publish) { + if (instance != null) { + throw new RuntimeException("You cannot initialize configuration more than once."); + } + instance = new MetricSingleton(); + if (publish) { try { MDC.setContextMap(mdc); LOGGER.info("Starting prometheus metric server.."); // The second constructor argument ('true') makes this server start as a separate daemon thread. // http://prometheus.github.io/client_java/io/prometheus/client/exporter/HTTPServer.html#HTTPServer-int-boolean- - monitoringDaemon = new HTTPServer(Integer.parseInt(monitorPort), true); + instance.setMonitoringDaemon(new HTTPServer(Integer.parseInt(monitorPort), true)); } catch (IOException e) { LOGGER.error("Error starting up Prometheus publishing server..", e); } - }); + } } @VisibleForTesting - public static void closeMonitoringServiceDaemon() { + public void closeMonitoringServiceDaemon() { monitoringDaemon.close(); LOGGER.info("Stopping monitoring daemon.."); } - @VisibleForTesting - public static void setToPublish() { - PUBLISH = true; - } - } diff --git a/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java b/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java index efa79554e29f..b44506a0b4b4 100644 --- a/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java +++ b/airbyte-metrics/src/test/java/io/airbyte/metrics/MetricSingletonTest.java @@ -41,13 +41,12 @@ public static void setUp() throws IOException { availPort = socket.getLocalPort(); } - MetricSingleton.setToPublish(); - MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of()); + MetricSingleton.initializeMonitoringServiceDaemon(String.valueOf(availPort), Map.of(), true); } @AfterAll public static void tearDown() { - MetricSingleton.closeMonitoringServiceDaemon(); + MetricSingleton.getInstance().closeMonitoringServiceDaemon(); } @Nested @@ -55,12 +54,12 @@ class Validation { @Test public void testNameWithDashFails() { - assertThrows(RuntimeException.class, () -> MetricSingleton.incrementCounter("bad-name", 0.0, "name with dashes are not allowed")); + assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("bad-name", 0.0, "name with dashes are not allowed")); } @Test public void testNoDescriptionFails() { - assertThrows(RuntimeException.class, () -> MetricSingleton.incrementCounter("good_name", 0.0, null)); + assertThrows(RuntimeException.class, () -> MetricSingleton.getInstance().incrementCounter("good_name", 0.0, null)); } } @@ -70,7 +69,7 @@ public void testCounter() throws InterruptedException, IOException { var metricName = "test_counter"; var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.incrementCounter(metricName, rand.nextDouble() * 2, "testing counter"); + MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing counter"); Thread.sleep(500); } @@ -83,7 +82,7 @@ public void testGauge() throws InterruptedException, IOException { var metricName = "test_gauge"; var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge"); + MetricSingleton.getInstance().incrementCounter(metricName, rand.nextDouble() * 2, "testing gauge"); Thread.sleep(500); } @@ -96,7 +95,7 @@ public void testTimer() throws InterruptedException, IOException { var metricName = "test_timer"; var rand = new Random(); for (int i = 0; i < 5; i++) { - MetricSingleton.recordTime(metricName, rand.nextDouble() * 2, "testing time"); + MetricSingleton.getInstance().recordTime(metricName, rand.nextDouble() * 2, "testing time"); Thread.sleep(500); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobRetrier.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobRetrier.java index ce35c86d8011..7dda21d3e80f 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobRetrier.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobRetrier.java @@ -4,7 +4,6 @@ package io.airbyte.scheduler.app; -import io.airbyte.config.EnvConfigs; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.JobNotifier; @@ -21,17 +20,18 @@ public class JobRetrier implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(JobRetrier.class); - private static final int MAX_SYNC_JOB_ATTEMPTS = new EnvConfigs().getMaxSyncJobAttempts();; private static final int RETRY_WAIT_MINUTES = 1; private final JobPersistence persistence; private final Supplier timeSupplier; private final JobNotifier jobNotifier; + private final int maxSyncJobAttempts; - public JobRetrier(JobPersistence jobPersistence, Supplier timeSupplier, JobNotifier jobNotifier) { + public JobRetrier(JobPersistence jobPersistence, Supplier timeSupplier, JobNotifier jobNotifier, int maxSyncJobAttempts) { this.persistence = jobPersistence; this.timeSupplier = timeSupplier; this.jobNotifier = jobNotifier; + this.maxSyncJobAttempts = maxSyncJobAttempts; } @Override @@ -75,7 +75,7 @@ private List incompleteJobs() { private boolean hasReachedMaxAttempt(Job job) { if (Job.REPLICATION_TYPES.contains(job.getConfigType())) { - return job.getAttemptsCount() >= MAX_SYNC_JOB_ATTEMPTS; + return job.getAttemptsCount() >= maxSyncJobAttempts; } else { return job.getAttemptsCount() >= 1; } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 2ee7c4bfd80c..790148e20c6e 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -59,7 +59,7 @@ * Operations can have thread pools under the hood. An important thread pool to note is that the job * submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this * pool determines the number of concurrent jobs that can be run. This is controlled via the - * {@link #SUBMITTER_NUM_THREADS} variable. + * SUBMITTER_NUM_THREADS variable of EnvConfigs. */ public class SchedulerApp { @@ -68,7 +68,6 @@ public class SchedulerApp { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; - private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads()); private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5); private static final Duration CLEANING_DELAY = Duration.ofHours(2); private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build(); @@ -79,28 +78,37 @@ public class SchedulerApp { private final JobCleaner jobCleaner; private final JobNotifier jobNotifier; private final TemporalClient temporalClient; + private final int submitterNumThreads; + private final int maxSyncJobAttempts; + private final String airbyteVersionOrWarnings; public SchedulerApp(Path workspaceRoot, JobPersistence jobPersistence, ConfigRepository configRepository, JobCleaner jobCleaner, JobNotifier jobNotifier, - TemporalClient temporalClient) { + TemporalClient temporalClient, + Integer submitterNumThreads, + Integer maxSyncJobAttempts, + String airbyteVersionOrWarnings) { this.workspaceRoot = workspaceRoot; this.jobPersistence = jobPersistence; this.configRepository = configRepository; this.jobCleaner = jobCleaner; this.jobNotifier = jobNotifier; this.temporalClient = temporalClient; + this.submitterNumThreads = submitterNumThreads; + this.maxSyncJobAttempts = maxSyncJobAttempts; + this.airbyteVersionOrWarnings = airbyteVersionOrWarnings; } public void start() throws IOException { - final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY); + final ExecutorService workerThreadPool = Executors.newFixedThreadPool(submitterNumThreads, THREAD_FACTORY); final ScheduledExecutorService scheduleJobsPool = Executors.newSingleThreadScheduledExecutor(); final ScheduledExecutorService executeJobsPool = Executors.newSingleThreadScheduledExecutor(); final ScheduledExecutorService cleanupJobsPool = Executors.newSingleThreadScheduledExecutor(); - final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); - final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier); + final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot, airbyteVersionOrWarnings); + final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts); final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient); final JobSubmitter jobSubmitter = new JobSubmitter( @@ -228,11 +236,12 @@ public static void main(String[] args) throws IOException, InterruptedException final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot); final Map mdc = MDC.getCopyOfContextMap(); - MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc); + MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics()); LOGGER.info("Launching scheduler..."); - new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient) - .start(); + new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient, + Integer.parseInt(configs.getSubmitterNumThreads()), configs.getMaxSyncJobAttempts(), configs.getAirbyteVersionOrWarning()) + .start(); } } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java index 190993da8995..6f723be2bbb1 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java @@ -29,15 +29,17 @@ public class TemporalWorkerRunFactory { private final TemporalClient temporalClient; private final Path workspaceRoot; + private final String airbyteVersionOrWarnings; - public TemporalWorkerRunFactory(TemporalClient temporalClient, Path workspaceRoot) { + public TemporalWorkerRunFactory(TemporalClient temporalClient, Path workspaceRoot, String airbyteVersionOrWarnings) { this.temporalClient = temporalClient; this.workspaceRoot = workspaceRoot; + this.airbyteVersionOrWarnings = airbyteVersionOrWarnings; } public WorkerRun create(Job job) { final int attemptId = job.getAttemptsCount(); - return WorkerRun.create(workspaceRoot, job.getId(), attemptId, createSupplier(job, attemptId)); + return WorkerRun.create(workspaceRoot, job.getId(), attemptId, createSupplier(job, attemptId), airbyteVersionOrWarnings); } public CheckedSupplier, Exception> createSupplier(Job job, int attemptId) { diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/WorkerRun.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/WorkerRun.java index 8ff3c9979c2e..738285ead839 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/WorkerRun.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/WorkerRun.java @@ -5,7 +5,6 @@ package io.airbyte.scheduler.app.worker_run; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.config.EnvConfigs; import io.airbyte.config.JobOutput; import io.airbyte.workers.OutputAndStatus; import io.airbyte.workers.WorkerUtils; @@ -26,22 +25,27 @@ public class WorkerRun implements Callable> { private final Path jobRoot; private final CheckedSupplier, Exception> workerRun; + private final String airbyteVersionOrWarnings; - public static WorkerRun create(Path workspaceRoot, long jobId, int attempt, CheckedSupplier, Exception> workerRun) { + public static WorkerRun create(Path workspaceRoot, + long jobId, + int attempt, + CheckedSupplier, Exception> workerRun, + String airbyteVersionOrWarnings) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, String.valueOf(jobId), attempt); - return new WorkerRun(jobRoot, workerRun); + return new WorkerRun(jobRoot, workerRun, airbyteVersionOrWarnings); } - public WorkerRun(final Path jobRoot, final CheckedSupplier, Exception> workerRun) { + public WorkerRun(final Path jobRoot, final CheckedSupplier, Exception> workerRun, String airbyteVersionOrWarnings) { this.jobRoot = jobRoot; this.workerRun = workerRun; + this.airbyteVersionOrWarnings = airbyteVersionOrWarnings; } @Override public OutputAndStatus call() throws Exception { - LOGGER.info("Executing worker wrapper. Airbyte version: {}", new EnvConfigs().getAirbyteVersionOrWarning()); + LOGGER.info("Executing worker wrapper. Airbyte version: {}", airbyteVersionOrWarnings); Files.createDirectories(jobRoot); - return workerRun.get(); } diff --git a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobRetrierTest.java b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobRetrierTest.java index acf58d1b6abc..73ad3c4ccd81 100644 --- a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobRetrierTest.java +++ b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/JobRetrierTest.java @@ -36,7 +36,7 @@ void setup() throws IOException { jobNotifier = mock(JobNotifier.class); persistence = mock(JobPersistence.class); - jobRetrier = new JobRetrier(persistence, () -> NOW, jobNotifier); + jobRetrier = new JobRetrier(persistence, () -> NOW, jobNotifier, 3); incompleteSyncJob = mock(Job.class); when(incompleteSyncJob.getId()).thenReturn(12L); when(incompleteSyncJob.getStatus()).thenReturn(JobStatus.INCOMPLETE); diff --git a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactoryTest.java b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactoryTest.java index 3c619972cfb8..f7f82a9ba638 100644 --- a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactoryTest.java +++ b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactoryTest.java @@ -46,7 +46,7 @@ void setup() throws IOException { Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_worker_run_test"); jobRoot = workspaceRoot.resolve(String.valueOf(JOB_ID)).resolve(String.valueOf(ATTEMPT_ID)); temporalClient = mock(TemporalClient.class); - workerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); + workerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot, "unknown airbyte version"); job = mock(Job.class, RETURNS_DEEP_STUBS); when(job.getId()).thenReturn(JOB_ID); when(job.getAttemptsCount()).thenReturn(ATTEMPT_ID); diff --git a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/WorkerRunTest.java b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/WorkerRunTest.java index 2b2f93ffbdc4..ddfec8bdee3a 100644 --- a/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/WorkerRunTest.java +++ b/airbyte-scheduler/app/src/test/java/io/airbyte/scheduler/app/worker_run/WorkerRunTest.java @@ -30,7 +30,7 @@ void setUp() throws IOException { @Test void test() throws Exception { final CheckedSupplier, Exception> supplier = mock(CheckedSupplier.class); - new WorkerRun(path, supplier).call(); + new WorkerRun(path, supplier, "unknown airbyte version").call(); assertTrue(Files.exists(path)); verify(supplier).get(); diff --git a/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java b/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java index 2197cea31971..7dc104f188bd 100644 --- a/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java +++ b/airbyte-secrets-migration/src/main/java/io/airbyte/secretsmigration/SecretsMigration.java @@ -25,13 +25,11 @@ public class SecretsMigration { private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final Logger LOGGER = LoggerFactory.getLogger(SecretsMigration.class); - final Configs configs; final boolean dryRun; final ConfigPersistence readFromPersistence; final ConfigPersistence writeToPersistence; - public SecretsMigration(Configs envConfigs, ConfigPersistence readFromPersistence, ConfigPersistence writeToPersistence, boolean dryRun) { - this.configs = envConfigs; + public SecretsMigration(ConfigPersistence readFromPersistence, ConfigPersistence writeToPersistence, boolean dryRun) { this.readFromPersistence = readFromPersistence; this.writeToPersistence = writeToPersistence; this.dryRun = dryRun; @@ -64,7 +62,7 @@ public static void main(String[] args) throws Exception { configs.getConfigDatabaseUrl()) .getInitialized()).withValidation(); final ConfigPersistence writeToPersistence = new FileSystemConfigPersistence(TEST_ROOT); - final SecretsMigration migration = new SecretsMigration(configs, readFromPersistence, writeToPersistence, false); + final SecretsMigration migration = new SecretsMigration(readFromPersistence, writeToPersistence, false); LOGGER.info("starting: {}", SecretsMigration.class); migration.run(); LOGGER.info("completed: {}", SecretsMigration.class); diff --git a/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java b/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java index 5c57fa8e281d..f05672691d68 100644 --- a/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java +++ b/airbyte-secrets-migration/src/test/java/io/airbyte/secretsmigration/SecretsMigrationTest.java @@ -14,7 +14,6 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.ConfigSchema; -import io.airbyte.config.EnvConfigs; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; @@ -65,7 +64,6 @@ class SecretsMigrationTest { protected static PostgreSQLContainer container; protected static Database database; - private static EnvConfigs configs; private static ConfigPersistence readFromConfigPersistence; private static ConfigPersistence writeToConfigPersistence; private static Path rootPath; @@ -84,9 +82,7 @@ static void init() throws IOException { } @BeforeEach - public void setup() throws Exception { - configs = new EnvConfigs(); - } + public void setup() throws Exception {} public static void failOnDifferingConfigurations(Map> leftConfigs, Map> rightConfigs) { // Check that both sets have exactly the same keys. If they don't, we already know we're failing the @@ -145,12 +141,12 @@ public void exportImportTest() throws JsonValidationException, IOException { assertTrue(writeToConfigPersistence.dumpConfigs().isEmpty(), "Write config should be empty before we use it (sanity check), but found keys: " + writeToConfigPersistence.dumpConfigs().keySet().toString()); - final SecretsMigration dryRunMigration = new SecretsMigration(configs, readFromConfigPersistence, writeToConfigPersistence, true); + final SecretsMigration dryRunMigration = new SecretsMigration(readFromConfigPersistence, writeToConfigPersistence, true); dryRunMigration.run(); assertTrue(writeToConfigPersistence.dumpConfigs().isEmpty(), "Dry run should not have modified anything."); // real export-import - final SecretsMigration migration = new SecretsMigration(configs, readFromConfigPersistence, writeToConfigPersistence, false); + final SecretsMigration migration = new SecretsMigration(readFromConfigPersistence, writeToConfigPersistence, false); migration.run(); // verify results diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java index 06a0681de74b..979727c8cea5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultNormalizationWorker.java @@ -5,7 +5,6 @@ package io.airbyte.workers; import io.airbyte.config.Configs.WorkerEnvironment; -import io.airbyte.config.EnvConfigs; import io.airbyte.config.NormalizationInput; import io.airbyte.workers.normalization.NormalizationRunner; import java.nio.file.Files; @@ -23,15 +22,18 @@ public class DefaultNormalizationWorker implements NormalizationWorker { private final String jobId; private final int attempt; private final NormalizationRunner normalizationRunner; + private final WorkerEnvironment workerEnvironment; private final AtomicBoolean cancelled; public DefaultNormalizationWorker(final String jobId, final int attempt, - final NormalizationRunner normalizationRunner) { + final NormalizationRunner normalizationRunner, + final WorkerEnvironment workerEnvironment) { this.jobId = jobId; this.attempt = attempt; this.normalizationRunner = normalizationRunner; + this.workerEnvironment = workerEnvironment; this.cancelled = new AtomicBoolean(false); } @@ -46,7 +48,7 @@ public Void run(NormalizationInput input, Path jobRoot) throws WorkerException { Path normalizationRoot = null; // There are no shared volumes on Kube; only create this for Docker. - if (new EnvConfigs().getWorkerEnvironment().equals(WorkerEnvironment.DOCKER)) { + if (workerEnvironment.equals(WorkerEnvironment.DOCKER)) { normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 456959ea5b00..53d86dbf5e01 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -5,6 +5,7 @@ package io.airbyte.workers; import io.airbyte.config.Configs; +import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.MaxWorkersConfig; import io.airbyte.config.helpers.LogClientSingleton; @@ -48,17 +49,20 @@ public class WorkerApp { private final SecretsHydrator secretsHydrator; private final WorkflowServiceStubs temporalService; private final MaxWorkersConfig maxWorkers; + private final WorkerEnvironment workerEnvironment; public WorkerApp(Path workspaceRoot, ProcessFactory processFactory, SecretsHydrator secretsHydrator, WorkflowServiceStubs temporalService, - MaxWorkersConfig maxWorkers) { + MaxWorkersConfig maxWorkers, + WorkerEnvironment workerEnvironment) { this.workspaceRoot = workspaceRoot; this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.temporalService = temporalService; this.maxWorkers = maxWorkers; + this.workerEnvironment = workerEnvironment; } public void start() { @@ -94,7 +98,7 @@ public void start() { syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); syncWorker.registerActivitiesImplementations( new SyncWorkflow.ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot), - new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot), + new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment), new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot)); factory.start(); @@ -107,7 +111,7 @@ private static ProcessFactory getProcessBuilderFactory(Configs configs) throws I final String localIp = InetAddress.getLocalHost().getHostAddress(); final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT; LOGGER.info("Using Kubernetes namespace: {}", configs.getKubeNamespace()); - return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl); + return new KubeProcessFactory(configs.getKubeNamespace(), officialClient, fabricClient, kubeHeartbeatUrl, configs.getTemporalWorkerPorts()); } else { return new DockerProcessFactory( configs.getWorkspaceRoot(), @@ -140,7 +144,7 @@ public static void main(String[] args) throws IOException, InterruptedException final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); - new WorkerApp(workspaceRoot, processFactory, secretsHydrator, temporalService, configs.getMaxWorkers()).start(); + new WorkerApp(workspaceRoot, processFactory, secretsHydrator, temporalService, configs.getMaxWorkers(), configs.getWorkerEnvironment()).start(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index f3b83475c335..efe0e67765f7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -513,8 +513,8 @@ private void close() { Exceptions.swallow(this.stderrServerSocket::close); Exceptions.swallow(this.executorService::shutdownNow); - KubePortManagerSingleton.offer(stdoutLocalPort); - KubePortManagerSingleton.offer(stderrLocalPort); + KubePortManagerSingleton.getInstance().offer(stdoutLocalPort); + KubePortManagerSingleton.getInstance().offer(stderrLocalPort); LOGGER.debug("Closed {}", podDefinition.getMetadata().getName()); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java index 4abaa420ce4b..937a8eca9a45 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePortManagerSingleton.java @@ -4,8 +4,6 @@ package io.airbyte.workers.process; -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.config.EnvConfigs; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -20,36 +18,63 @@ * Although this data structure can do without the wrapper class, this class allows easier testing * via the {@link #getNumAvailablePorts()} function. * - * The singleton pattern clarifies that only one copy of this class is intended to exists per + * The singleton pattern clarifies that only one copy of this class is intended to exist per * scheduler deployment. */ public class KubePortManagerSingleton { private static final Logger LOGGER = LoggerFactory.getLogger(KubePortManagerSingleton.class); + + private static KubePortManagerSingleton instance; + private static final int MAX_PORTS_PER_WORKER = 4; // A sync has two workers. Each worker requires 2 ports. - private static BlockingQueue workerPorts = new LinkedBlockingDeque<>(new EnvConfigs().getTemporalWorkerPorts()); + private BlockingQueue workerPorts; + + private KubePortManagerSingleton(Set ports) { + workerPorts = new LinkedBlockingDeque<>(ports); + } + + /** + * Make sure init(ports) is called once prior to repeatedly using getInstance(). + * + * @return + */ + public static synchronized KubePortManagerSingleton getInstance() { + if (instance == null) { + throw new RuntimeException("Must initialize with init(ports) before using."); + } + return instance; + } + + /** + * Sets up the port range; make sure init(ports) is called once prior to repeatedly using + * getInstance(). + * + * @return + */ + public static synchronized void init(Set ports) { + if (instance != null) { + throw new RuntimeException("Cannot initialize twice!"); + } + instance = new KubePortManagerSingleton(ports); + } - public static Integer take() throws InterruptedException { + public Integer take() throws InterruptedException { return workerPorts.poll(10, TimeUnit.MINUTES); } - public static void offer(Integer port) { + public void offer(Integer port) { if (!workerPorts.contains(port)) { workerPorts.add(port); } } - public static int getNumAvailablePorts() { + public int getNumAvailablePorts() { return workerPorts.size(); } - public static int getSupportedWorkers() { + public int getSupportedWorkers() { return workerPorts.size() / MAX_PORTS_PER_WORKER; } - @VisibleForTesting - protected static void setWorkerPorts(Set ports) { - workerPorts = new LinkedBlockingDeque<>(ports); - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 7167a6c5e20a..7db56a857848 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -15,6 +15,7 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang3.RandomStringUtils; @@ -56,8 +57,9 @@ public class KubeProcessFactory implements ProcessFactory { public KubeProcessFactory(String namespace, ApiClient officialClient, KubernetesClient fabricClient, - String kubeHeartbeatUrl) { - this(namespace, officialClient, fabricClient, kubeHeartbeatUrl, Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress())); + String kubeHeartbeatUrl, + Set ports) { + this(namespace, officialClient, fabricClient, kubeHeartbeatUrl, Exceptions.toRuntime(() -> InetAddress.getLocalHost().getHostAddress()), ports); } /** @@ -74,12 +76,14 @@ public KubeProcessFactory(String namespace, ApiClient officialClient, KubernetesClient fabricClient, String kubeHeartbeatUrl, - String processRunnerHost) { + String processRunnerHost, + Set ports) { this.namespace = namespace; this.officialClient = officialClient; this.fabricClient = fabricClient; this.kubeHeartbeatUrl = kubeHeartbeatUrl; this.processRunnerHost = processRunnerHost; + KubePortManagerSingleton.init(ports); } @Override @@ -98,10 +102,10 @@ public Process create(String jobId, // used to differentiate source and destination processes with the same id and attempt final String podName = createPodName(imageName, jobId, attempt); - final int stdoutLocalPort = KubePortManagerSingleton.take(); + final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take(); LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort); - final int stderrLocalPort = KubePortManagerSingleton.take(); + final int stderrLocalPort = KubePortManagerSingleton.getInstance().take(); LOGGER.info("{} stderrLocalPort = {}", podName, stderrLocalPort); var allLabels = new HashMap<>(customLabels); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java index 66e97569de2a..05fd7d8b70da 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java @@ -9,6 +9,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; +import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; import io.airbyte.config.NormalizationInput; import io.airbyte.config.OperatorDbtInput; @@ -253,20 +254,26 @@ class NormalizationActivityImpl implements NormalizationActivity { private final SecretsHydrator secretsHydrator; private final Path workspaceRoot; private final AirbyteConfigValidator validator; + private final WorkerEnvironment workerEnvironment; - public NormalizationActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot) { - this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator()); + public NormalizationActivityImpl(ProcessFactory processFactory, + SecretsHydrator secretsHydrator, + Path workspaceRoot, + WorkerEnvironment workerEnvironment) { + this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment); } @VisibleForTesting NormalizationActivityImpl(ProcessFactory processFactory, SecretsHydrator secretsHydrator, Path workspaceRoot, - AirbyteConfigValidator validator) { + AirbyteConfigValidator validator, + WorkerEnvironment workerEnvironment) { this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = validator; + this.workerEnvironment = workerEnvironment; } @Override @@ -299,7 +306,8 @@ private CheckedSupplier, Exception> getWorkerFa Math.toIntExact(jobRunConfig.getAttemptId()), NormalizationRunnerFactory.create( destinationLauncherConfig.getDockerImage(), - processFactory)); + processFactory), + workerEnvironment); } } diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java index ee6f7d4b996e..4d206b75dff1 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/KubePodProcessIntegrationTest.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.commons.lang.RandomStringUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,27 +37,31 @@ public class KubePodProcessIntegrationTest { private static final boolean IS_MINIKUBE = Boolean.parseBoolean(Optional.ofNullable(System.getenv("IS_MINIKUBE")).orElse("false")); - private List openPorts; - private int heartbeatPort; - private String heartbeatUrl; - private ApiClient officialClient; - private KubernetesClient fabricClient; - private KubeProcessFactory processFactory; + private static List openPorts; + private static int heartbeatPort; + private static String heartbeatUrl; + private static ApiClient officialClient; + private static KubernetesClient fabricClient; + private static KubeProcessFactory processFactory; - private static WorkerHeartbeatServer server; + private WorkerHeartbeatServer server; - @BeforeEach - public void setup() throws Exception { + @BeforeAll + public static void init() throws Exception { openPorts = new ArrayList<>(getOpenPorts(5)); - KubePortManagerSingleton.setWorkerPorts(new HashSet<>(openPorts.subList(1, openPorts.size() - 1))); heartbeatPort = openPorts.get(0); heartbeatUrl = getHost() + ":" + heartbeatPort; officialClient = Config.defaultClient(); fabricClient = new DefaultKubernetesClient(); - processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, getHost()); + processFactory = new KubeProcessFactory("default", officialClient, fabricClient, heartbeatUrl, getHost(), + new HashSet<>(openPorts.subList(1, openPorts.size() - 1))); + } + + @BeforeEach + public void setup() throws Exception { server = new WorkerHeartbeatServer(heartbeatPort); server.startBackground(); } @@ -69,20 +74,20 @@ public void teardown() throws Exception { @Test public void testSuccessfulSpawning() throws Exception { // start a finite process - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess("echo hi; sleep 1; echo hi2"); process.waitFor(); // the pod should be dead and in a good state assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertEquals(0, process.exitValue()); } @Test public void testSuccessfulSpawningWithQuotes() throws Exception { // start a finite process - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess("echo \"h\\\"i\"; sleep 1; echo hi2"); var output = new String(process.getInputStream().readAllBytes()); assertEquals("h\"i\nhi2\n", output); @@ -90,53 +95,53 @@ public void testSuccessfulSpawningWithQuotes() throws Exception { // the pod should be dead and in a good state assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertEquals(0, process.exitValue()); } @Test public void testPipeInEntrypoint() throws Exception { // start a process that has a pipe in the entrypoint - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess("echo hi | cat"); process.waitFor(); // the pod should be dead and in a good state assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertEquals(0, process.exitValue()); } @Test public void testExitCodeRetrieval() throws Exception { // start a process that requests - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess("exit 10"); process.waitFor(); // the pod should be dead with the correct error code assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertEquals(10, process.exitValue()); } @Test public void testMissingEntrypoint() throws WorkerException, InterruptedException { // start a process with an entrypoint that doesn't exist - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess(null); process.waitFor(); // the pod should be dead and in an error state assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertEquals(127, process.exitValue()); } @Test public void testKillingWithoutHeartbeat() throws Exception { // start an infinite process - var availablePortsBefore = KubePortManagerSingleton.getNumAvailablePorts(); + var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts(); final Process process = getProcess("while true; do echo hi; sleep 1; done"); // kill the heartbeat server @@ -147,7 +152,7 @@ public void testKillingWithoutHeartbeat() throws Exception { // the pod should be dead and in an error state assertFalse(process.isAlive()); - assertEquals(availablePortsBefore, KubePortManagerSingleton.getNumAvailablePorts()); + assertEquals(availablePortsBefore, KubePortManagerSingleton.getInstance().getNumAvailablePorts()); assertNotEquals(0, process.exitValue()); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java index e02e93ee5c3c..a8a950e86c8f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultNormalizationWorkerTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.NormalizationInput; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; @@ -53,7 +54,8 @@ void setup() throws Exception { @Test void test() throws Exception { - final DefaultNormalizationWorker normalizationWorker = new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner); + final DefaultNormalizationWorker normalizationWorker = + new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner, WorkerEnvironment.DOCKER); normalizationWorker.run(normalizationInput, jobRoot);