diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index 8051e8038d0d..337afacd66c2 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -31,6 +31,7 @@ dependencies { testAnnotationProcessor platform(libs.micronaut.bom) testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor + testAnnotationProcessor libs.jmh.annotations testImplementation libs.bundles.micronaut.test testImplementation 'com.jayway.jsonpath:json-path:2.7.0' @@ -38,6 +39,8 @@ dependencies { testImplementation libs.postgresql testImplementation libs.platform.testcontainers testImplementation libs.platform.testcontainers.postgresql + testImplementation libs.jmh.core + testImplementation libs.jmh.annotations testImplementation project(':airbyte-commons-docker') } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 4b669bd9e71e..f699ff07e381 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -309,7 +309,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); - Long recordsRead = 0L; + long recordsRead = 0L; final Map, Integer>> validationErrors = new HashMap<>(); final Map> streamToSelectedFields = new HashMap<>(); if (fieldSelectionEnabled) { diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java new file mode 100644 index 000000000000..f35c97a3027e --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteDestination; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Empty Airbyte Destination. Does nothing with messages. Intended for performance testing. + */ +public class EmptyAirbyteDestination implements AirbyteDestination { + + @Override + public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception { + + } + + @Override + public void accept(AirbyteMessage message) throws Exception { + + } + + @Override + public void notifyEndOfInput() throws Exception { + + } + + @Override + public boolean isFinished() { + return true; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + return Optional.empty(); + } + + @Override + public void close() throws Exception {} + + @Override + public void cancel() throws Exception { + + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java new file mode 100644 index 000000000000..6a6ad64540e7 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteSource; +import io.airbyte.workers.test_utils.AirbyteMessageUtils; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing. + * Intended for performance testing. + */ +public class LimitedAirbyteSource implements AirbyteSource { + + private static final int TOTAL_RECORDS = 1_000_000; + + private int currentRecords = 0; + + @Override + public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception { + + } + + @Override + public boolean isFinished() { + return currentRecords == TOTAL_RECORDS; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + currentRecords++; + return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data", + "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance." + + "Random append to prevent dead code generation: " + currentRecords)); + } + + @Override + public void close() throws Exception { + + } + + @Override + public void cancel() throws Exception { + + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java new file mode 100644 index 000000000000..13728602785c --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; +import io.airbyte.config.ReplicationOutput; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.metrics.lib.NotImplementedMetricClient; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; +import io.airbyte.workers.RecordSchemaValidator; +import io.airbyte.workers.WorkerMetricReporter; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.NamespacingMapper; +import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Warmup; + +@Slf4j +public class ReplicationWorkerPerformanceTest { + + /** + * Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source + * {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}. + *

+ * Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then + * reports a time distribution for the time taken to run the E2E sync. + *

+ * Because the reported time does not explicitly include throughput numbers, throughput logging has + * been added. This class is intended to help devs understand the impact of changes on throughput. + *

+ * To use this, simply run the main method, make yourself a cup of coffee for 5 mins, then look the + * logs. + */ + @Benchmark + // SampleTime = the time taken to run the benchmarked method. Use this because we only care about + // the time taken to sync the entire dataset. + @BenchmarkMode(Mode.SampleTime) + // Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need + // that fine a result. + @Warmup(iterations = 0) + // How many runs to do. + @Fork(1) + // Within each run, how many iterations to do. + @Measurement(iterations = 2) + public void executeOneSync() throws InterruptedException { + final var perSource = new LimitedAirbyteSource(); + final var perDestination = new EmptyAirbyteDestination(); + final var messageTracker = new AirbyteMessageTracker(); + final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); + final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", ""); + final var validator = new RecordSchemaValidator(Map.of( + new AirbyteStreamNameNamespacePair("s1", null), + CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING)))); + + final var worker = new DefaultReplicationWorker("1", 0, + perSource, + dstNamespaceMapper, + perDestination, + messageTracker, + validator, + metricReporter, + false); + final AtomicReference output = new AtomicReference<>(); + final Thread workerThread = new Thread(() -> { + try { + output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() + .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), + Path.of("/"))); + } catch (final WorkerException e) { + throw new RuntimeException(e); + } + }); + + workerThread.start(); + workerThread.join(); + final var summary = output.get().getReplicationAttemptSummary(); + final var mbRead = summary.getBytesSynced() / 1_000_000; + final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; + log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); + } + + public static void main(String[] args) throws IOException { + // Run this main class to start benchmarking. + org.openjdk.jmh.Main.main(args); + } + +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java new file mode 100644 index 000000000000..d33634e5d9bc --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.general; + +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.workers.internal.AirbyteMapper; + +/** + * Stub mapper testing what happens without any mapping. + */ +public class StubAirbyteMapper implements AirbyteMapper { + + @Override + public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) { + return null; + } + + @Override + public AirbyteMessage mapMessage(AirbyteMessage message) { + return message; + } + +} diff --git a/deps.toml b/deps.toml index 7a54da9f967f..e3eff5d61cb4 100644 --- a/deps.toml +++ b/deps.toml @@ -15,6 +15,7 @@ fasterxml_version = "2.13.3" flyway = "7.14.0" glassfish_version = "2.31" hikaricp = "5.0.1" +jmh = "1.36" jooq = "3.13.4" junit-jupiter = "5.9.0" log4j = "2.17.2" @@ -68,6 +69,8 @@ jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-j java-dogstatsd-client = { module = "com.datadoghq:java-dogstatsd-client", version = "4.1.0" } javax-databind = { module = "javax.xml.bind:jaxb-api", version = "2.4.0-b180830.0359" } jcl-over-slf4j = { module = "org.slf4j:jcl-over-slf4j", version.ref = "slf4j" } +jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" } +jmh-annotations = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" } jooq = { module = "org.jooq:jooq", version.ref = "jooq" } jooq-codegen = { module = "org.jooq:jooq-codegen", version.ref = "jooq" } jooq-meta = { module = "org.jooq:jooq-meta", version.ref = "jooq" } diff --git a/spotbugs-exclude-filter-file.xml b/spotbugs-exclude-filter-file.xml index c5da06291781..129c0baf73c8 100644 --- a/spotbugs-exclude-filter-file.xml +++ b/spotbugs-exclude-filter-file.xml @@ -8,6 +8,9 @@ + + +