From b276b57e92aa5d7115645a9d11fadae751558ab9 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 30 Dec 2022 17:55:22 -0800 Subject: [PATCH 01/14] Checkpoint: Use Java 19 features to test orchestrator speed. --- .../general/DefaultReplicationWorker.java | 104 +++++++++--------- .../general/DefaultReplicationWorkerTest.java | 5 +- .../general/EmptyAirbyteDestination.java | 51 +++++++++ .../workers/general/LimitedAirbyteSource.java | 46 ++++++++ .../ReplicationWorkerPerformanceTest.java | 58 ++++++++++ .../workers/general/StubAirbyteMapper.java | 18 +++ .../server/converters/JobConverter.java | 4 + build.gradle | 11 +- 8 files changed, 244 insertions(+), 53 deletions(-) create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 1d8034479239..e3d9d3d5777f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -34,7 +34,6 @@ import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.exception.RecordSchemaValidationException; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.ThreadedTimeTracker; @@ -57,7 +56,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -115,7 +113,9 @@ public DefaultReplicationWorker(final String jobId, this.mapper = mapper; this.destination = destination; this.messageTracker = messageTracker; + // use a virtual thread here? this.executors = Executors.newFixedThreadPool(2); +// this.executors = Executors.newVirtualThreadPerTaskExecutor(); this.recordSchemaValidator = recordSchemaValidator; this.metricReporter = metricReporter; this.fieldSelectionEnabled = fieldSelectionEnabled; @@ -308,52 +308,56 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); - Long recordsRead = 0L; + AtomicReference recordsRead = new AtomicReference<>(0L); final Map, Integer>> validationErrors = new HashMap<>(); final Map> streamToSelectedFields = new HashMap<>(); if (fieldSelectionEnabled) { populatedStreamToSelectedFields(catalog, streamToSelectedFields); } try { + // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { - final Optional messageOptional; - try { - messageOptional = source.attemptRead(); - } catch (final Exception e) { - throw new SourceException("Source process read attempt failed", e); - } - - if (messageOptional.isPresent()) { - final AirbyteMessage airbyteMessage = messageOptional.get(); - if (fieldSelectionEnabled) { - filterSelectedFields(streamToSelectedFields, airbyteMessage); + // everything in here can be given to a virtual thread + Thread.ofVirtual().start(() -> { + final Optional messageOptional; + try { + messageOptional = source.attemptRead(); + } catch (final Exception e) { + throw new SourceException("Source process read attempt failed", e); } - validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); - final AirbyteMessage message = mapper.mapMessage(airbyteMessage); - messageTracker.acceptFromSource(message); + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + if (fieldSelectionEnabled) { + filterSelectedFields(streamToSelectedFields, airbyteMessage); + } + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); - try { - if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { - destination.accept(message); + messageTracker.acceptFromSource(message); + + try { + if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { + destination.accept(message); + } + } catch (final Exception e) { + throw new DestinationException("Destination process message delivery failed", e); } - } catch (final Exception e) { - throw new DestinationException("Destination process message delivery failed", e); - } - recordsRead += 1; + recordsRead.updateAndGet(v -> v + 1); - if (recordsRead % 1000 == 0) { - LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); - } - } else { - LOGGER.info("Source has no more messages, closing connection."); - try { - source.close(); - } catch (final Exception e) { - throw new SourceException("Source cannot be stopped!", e); + if (recordsRead.get() % 1000 == 0) { + LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); + } + } else { + LOGGER.info("Source has no more messages, closing connection."); + try { + source.close(); + } catch (final Exception e) { + throw new SourceException("Source cannot be stopped!", e); + } } - } + }); } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); @@ -557,22 +561,22 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid // avoid noise by validating only if the stream has less than 10 records with validation errors final boolean streamHasLessThenTenErrs = validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; - if (streamHasLessThenTenErrs) { - try { - recordSchemaValidator.validateSchema(record, messageStream); - } catch (final RecordSchemaValidationException e) { - final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); - if (exceptionWithCount == null) { - validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); - } else { - final Integer currentCount = exceptionWithCount.getRight(); - final Set currentErrorMessages = exceptionWithCount.getLeft(); - final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); - validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); - } - } - - } +// if (streamHasLessThenTenErrs) { +// try { +// recordSchemaValidator.validateSchema(record, messageStream); +// } catch (final RecordSchemaValidationException e) { +// final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); +// if (exceptionWithCount == null) { +// validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); +// } else { +// final Integer currentCount = exceptionWithCount.getRight(); +// final Set currentErrorMessages = exceptionWithCount.getLeft(); +// final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); +// validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); +// } +// } +// +// } } /** diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index d3b874f9e4ac..e4e92ec1ae09 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -535,7 +535,10 @@ void testCancellation() throws InterruptedException { sleep(100); } - worker.cancel(); + LOGGER.info("total records emitted: {}, total bytes emitted: {}", messageTracker.getTotalRecordsEmitted(), messageTracker.getTotalBytesEmitted()); + + +// worker.cancel(); Assertions.assertTimeout(Duration.ofSeconds(5), (Executable) workerThread::join); assertNotNull(output.get()); assertEquals(output.get().getState().getState(), STATE_MESSAGE.getState().getData()); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java new file mode 100644 index 000000000000..b36878f6b5e6 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java @@ -0,0 +1,51 @@ +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteDestination; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Empty Airbyte Destination. Does nothing with messages. Intended for performance testing. + */ +public class EmptyAirbyteDestination implements AirbyteDestination { + + @Override + public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception { + + } + + @Override + public void accept(AirbyteMessage message) throws Exception { + + } + + @Override + public void notifyEndOfInput() throws Exception { + + } + + @Override + public boolean isFinished() { + return true; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + return Optional.empty(); + } + + @Override + public void close() throws Exception {} + + @Override + public void cancel() throws Exception { + + } +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java new file mode 100644 index 000000000000..b84019d10d4f --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -0,0 +1,46 @@ +package io.airbyte.workers.general; + +import io.airbyte.config.WorkerSourceConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteSource; +import io.airbyte.workers.test_utils.AirbyteMessageUtils; +import java.nio.file.Path; +import java.util.Optional; + +public class LimitedAirbyteSource implements AirbyteSource { + private static final int TOTAL_RECORDS = 10_000_000; + + private int currentRecords = 0; + + @Override + public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception { + + } + + @Override + public boolean isFinished() { + return currentRecords == TOTAL_RECORDS; + } + + @Override + public int getExitValue() { + return 0; + } + + @Override + public Optional attemptRead() { + currentRecords++; + return Optional.of(AirbyteMessageUtils.createRecordMessage("test stream", "data", + "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance.")); + } + + @Override + public void close() throws Exception { + + } + + @Override + public void cancel() throws Exception { + + } +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java new file mode 100644 index 000000000000..94a9442caf98 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -0,0 +1,58 @@ +package io.airbyte.workers.general; + +import io.airbyte.config.ReplicationOutput; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.metrics.lib.NotImplementedMetricClient; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import io.airbyte.workers.RecordSchemaValidator; +import io.airbyte.workers.WorkerMetricReporter; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ReplicationWorkerPerformanceTest { + + public static void main(String[] args) throws WorkerException, InterruptedException { + var perSource = new LimitedAirbyteSource(); + var perDestination = new EmptyAirbyteDestination(); + var messageTracker = new AirbyteMessageTracker(); + var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); + var mapper = new StubAirbyteMapper(); + var validator = new RecordSchemaValidator(Map.of()); + + var worker = new DefaultReplicationWorker("1", 0, + perSource, + mapper, + perDestination, + messageTracker, + validator, + metricReporter, + false + ); + AtomicReference output = new AtomicReference<>(); + final Thread workerThread = new Thread(() -> { + try { + output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() + .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), + Path.of("/"))); + } catch (final WorkerException e) { + throw new RuntimeException(e); + } + }); + + workerThread.start(); + workerThread.join(); + var summary = output.get().getReplicationAttemptSummary(); + var mbRead = summary.getBytesSynced()/1_000_000; + var timeTakenSec = (summary.getEndTime() - summary.getStartTime())/1000.0; + log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead/timeTakenSec); + } +} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java new file mode 100644 index 000000000000..0f2e6b955c25 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java @@ -0,0 +1,18 @@ +package io.airbyte.workers.general; + +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.workers.internal.AirbyteMapper; + +public class StubAirbyteMapper implements AirbyteMapper { + + @Override + public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) { + return null; + } + + @Override + public AirbyteMessage mapMessage(AirbyteMessage message) { + return message; + } +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 52c28f3640f1..4db78619e152 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -48,6 +48,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -251,4 +252,7 @@ public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus( .hasNormalizationFailed(databaseStatus.normalizationFailed()); } + public static void main(String[] args) { + + } } diff --git a/build.gradle b/build.gradle index e0442234e8c5..72d433b40dbe 100644 --- a/build.gradle +++ b/build.gradle @@ -283,8 +283,15 @@ subprojects { subproj -> apply plugin: 'com.github.spotbugs' apply plugin: 'pmd' - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 + sourceCompatibility = JavaVersion.VERSION_19 + targetCompatibility = JavaVersion.VERSION_19 + + tasks.withType(JavaCompile) { + options.compilerArgs += '--enable-preview' + } + tasks.withType(Test) { + jvmArgs += "--enable-preview" + } repositories { mavenCentral() From 097ed19531cc356838dc1fd701fd43104b851c40 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 2 Jan 2023 17:20:32 -0800 Subject: [PATCH 02/14] Remove virtual threads. --- .../io/airbyte/workers/general/DefaultReplicationWorker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index e3d9d3d5777f..60227e9db505 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -314,11 +314,12 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou if (fieldSelectionEnabled) { populatedStreamToSelectedFields(catalog, streamToSelectedFields); } + var virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { // everything in here can be given to a virtual thread - Thread.ofVirtual().start(() -> { +// virtualExecutor.submit(() -> { final Optional messageOptional; try { messageOptional = source.attemptRead(); @@ -357,7 +358,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou throw new SourceException("Source cannot be stopped!", e); } } - }); +// }); } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); From bec9d04c6a5cb3af191d84b1aad1846b8e767828 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 Jan 2023 18:31:52 -0800 Subject: [PATCH 03/14] Revert Java 19 changes. Get things working again. --- .../general/DefaultReplicationWorker.java | 36 ++++++++++--------- .../general/DefaultReplicationWorkerTest.java | 2 +- .../server/converters/JobConverter.java | 5 +-- build.gradle | 18 +++++----- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 138f285dc56d..04bf02e85457 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -34,6 +34,7 @@ import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.exception.RecordSchemaValidationException; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.ThreadedTimeTracker; @@ -56,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; @@ -315,7 +317,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou if (fieldSelectionEnabled) { populatedStreamToSelectedFields(catalog, streamToSelectedFields); } - var virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); +// var virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { @@ -563,22 +565,22 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid // avoid noise by validating only if the stream has less than 10 records with validation errors final boolean streamHasLessThenTenErrs = validationErrors.get(messageStream) == null || validationErrors.get(messageStream).getRight() < 10; -// if (streamHasLessThenTenErrs) { -// try { -// recordSchemaValidator.validateSchema(record, messageStream); -// } catch (final RecordSchemaValidationException e) { -// final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); -// if (exceptionWithCount == null) { -// validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); -// } else { -// final Integer currentCount = exceptionWithCount.getRight(); -// final Set currentErrorMessages = exceptionWithCount.getLeft(); -// final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); -// validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); -// } -// } -// -// } + if (streamHasLessThenTenErrs) { + try { + recordSchemaValidator.validateSchema(record, messageStream); + } catch (final RecordSchemaValidationException e) { + final ImmutablePair, Integer> exceptionWithCount = validationErrors.get(messageStream); + if (exceptionWithCount == null) { + validationErrors.put(messageStream, new ImmutablePair<>(e.errorMessages, 1)); + } else { + final Integer currentCount = exceptionWithCount.getRight(); + final Set currentErrorMessages = exceptionWithCount.getLeft(); + final Set updatedErrorMessages = Stream.concat(currentErrorMessages.stream(), e.errorMessages.stream()).collect(Collectors.toSet()); + validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1)); + } + } + + } } /** diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index e4e92ec1ae09..c5d9ae64fa39 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -538,7 +538,7 @@ void testCancellation() throws InterruptedException { LOGGER.info("total records emitted: {}, total bytes emitted: {}", messageTracker.getTotalRecordsEmitted(), messageTracker.getTotalBytesEmitted()); -// worker.cancel(); + worker.cancel(); Assertions.assertTimeout(Duration.ofSeconds(5), (Executable) workerThread::join); assertNotNull(output.get()); assertEquals(output.get().getState().getState(), STATE_MESSAGE.getState().getData()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 4db78619e152..ecf85595209d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -251,8 +251,5 @@ public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus( .recordsCommitted(databaseStatus.recordsCommitted().orElse(0L)) .hasNormalizationFailed(databaseStatus.normalizationFailed()); } - - public static void main(String[] args) { - - } + } diff --git a/build.gradle b/build.gradle index c53fba73724d..ce7b125f466c 100644 --- a/build.gradle +++ b/build.gradle @@ -282,15 +282,15 @@ subprojects { subproj -> apply plugin: 'com.github.spotbugs' apply plugin: 'pmd' - sourceCompatibility = JavaVersion.VERSION_19 - targetCompatibility = JavaVersion.VERSION_19 - - tasks.withType(JavaCompile) { - options.compilerArgs += '--enable-preview' - } - tasks.withType(Test) { - jvmArgs += "--enable-preview" - } + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 + +// tasks.withType(JavaCompile) { +// options.compilerArgs += '--enable-preview' +// } +// tasks.withType(Test) { +// jvmArgs += "--enable-preview" +// } repositories { mavenCentral() From a05ab50a27891fd2ec54c7e29ad5585556e602a2 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 Jan 2023 19:22:37 -0800 Subject: [PATCH 04/14] Remove testing changes. --- .../workers/general/DefaultReplicationWorker.java | 12 +++--------- .../workers/general/LimitedAirbyteSource.java | 3 ++- build.gradle | 7 ------- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 04bf02e85457..e296b1267b3d 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -115,9 +115,7 @@ public DefaultReplicationWorker(final String jobId, this.mapper = mapper; this.destination = destination; this.messageTracker = messageTracker; - // use a virtual thread here? this.executors = Executors.newFixedThreadPool(2); -// this.executors = Executors.newVirtualThreadPerTaskExecutor(); this.recordSchemaValidator = recordSchemaValidator; this.metricReporter = metricReporter; this.fieldSelectionEnabled = fieldSelectionEnabled; @@ -311,18 +309,15 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); - AtomicReference recordsRead = new AtomicReference<>(0L); + long recordsRead = 0L; final Map, Integer>> validationErrors = new HashMap<>(); final Map> streamToSelectedFields = new HashMap<>(); if (fieldSelectionEnabled) { populatedStreamToSelectedFields(catalog, streamToSelectedFields); } -// var virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { - // everything in here can be given to a virtual thread -// virtualExecutor.submit(() -> { final Optional messageOptional; try { messageOptional = source.attemptRead(); @@ -348,9 +343,9 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou throw new DestinationException("Destination process message delivery failed", e); } - recordsRead.updateAndGet(v -> v + 1); + recordsRead += 1; - if (recordsRead.get() % 1000 == 0) { + if (recordsRead % 1000 == 0) { LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); } } else { @@ -361,7 +356,6 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou throw new SourceException("Source cannot be stopped!", e); } } -// }); } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java index b84019d10d4f..0d6fdc9b7abb 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -31,7 +31,8 @@ public int getExitValue() { public Optional attemptRead() { currentRecords++; return Optional.of(AirbyteMessageUtils.createRecordMessage("test stream", "data", - "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance.")); + "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance." + + "Random append to prevent dead code generation: " + currentRecords)); } @Override diff --git a/build.gradle b/build.gradle index ce7b125f466c..6b1eeff05dd6 100644 --- a/build.gradle +++ b/build.gradle @@ -285,13 +285,6 @@ subprojects { subproj -> sourceCompatibility = JavaVersion.VERSION_17 targetCompatibility = JavaVersion.VERSION_17 -// tasks.withType(JavaCompile) { -// options.compilerArgs += '--enable-preview' -// } -// tasks.withType(Test) { -// jvmArgs += "--enable-preview" -// } - repositories { mavenCentral() maven { From c508e43829ede4e94a9f1c6ef5da47837152c865 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 Jan 2023 19:23:48 -0800 Subject: [PATCH 05/14] Format. --- .../general/DefaultReplicationWorker.java | 62 +++++++++---------- .../general/DefaultReplicationWorkerTest.java | 1 - .../general/EmptyAirbyteDestination.java | 5 ++ .../workers/general/LimitedAirbyteSource.java | 6 ++ .../ReplicationWorkerPerformanceTest.java | 16 +++-- .../workers/general/StubAirbyteMapper.java | 5 ++ .../server/converters/JobConverter.java | 3 +- 7 files changed, 58 insertions(+), 40 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index e296b1267b3d..daaf983ece65 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -318,44 +318,44 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { - final Optional messageOptional; - try { - messageOptional = source.attemptRead(); - } catch (final Exception e) { - throw new SourceException("Source process read attempt failed", e); - } + final Optional messageOptional; + try { + messageOptional = source.attemptRead(); + } catch (final Exception e) { + throw new SourceException("Source process read attempt failed", e); + } - if (messageOptional.isPresent()) { - final AirbyteMessage airbyteMessage = messageOptional.get(); - if (fieldSelectionEnabled) { - filterSelectedFields(streamToSelectedFields, airbyteMessage); - } - validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); - final AirbyteMessage message = mapper.mapMessage(airbyteMessage); + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + if (fieldSelectionEnabled) { + filterSelectedFields(streamToSelectedFields, airbyteMessage); + } + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); - messageTracker.acceptFromSource(message); + messageTracker.acceptFromSource(message); - try { - if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { - destination.accept(message); - } - } catch (final Exception e) { - throw new DestinationException("Destination process message delivery failed", e); + try { + if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { + destination.accept(message); } + } catch (final Exception e) { + throw new DestinationException("Destination process message delivery failed", e); + } - recordsRead += 1; + recordsRead += 1; - if (recordsRead % 1000 == 0) { - LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); - } - } else { - LOGGER.info("Source has no more messages, closing connection."); - try { - source.close(); - } catch (final Exception e) { - throw new SourceException("Source cannot be stopped!", e); - } + if (recordsRead % 1000 == 0) { + LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); } + } else { + LOGGER.info("Source has no more messages, closing connection."); + try { + source.close(); + } catch (final Exception e) { + throw new SourceException("Source cannot be stopped!", e); + } + } } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index c5d9ae64fa39..d94ef0aeae88 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -537,7 +537,6 @@ void testCancellation() throws InterruptedException { LOGGER.info("total records emitted: {}, total bytes emitted: {}", messageTracker.getTotalRecordsEmitted(), messageTracker.getTotalBytesEmitted()); - worker.cancel(); Assertions.assertTimeout(Duration.ofSeconds(5), (Executable) workerThread::join); assertNotNull(output.get()); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java index b36878f6b5e6..f35c97a3027e 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/EmptyAirbyteDestination.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.general; import io.airbyte.config.WorkerDestinationConfig; @@ -48,4 +52,5 @@ public void close() throws Exception {} public void cancel() throws Exception { } + } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java index 0d6fdc9b7abb..ac45dbc8d208 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.general; import io.airbyte.config.WorkerSourceConfig; @@ -8,6 +12,7 @@ import java.util.Optional; public class LimitedAirbyteSource implements AirbyteSource { + private static final int TOTAL_RECORDS = 10_000_000; private int currentRecords = 0; @@ -44,4 +49,5 @@ public void close() throws Exception { public void cancel() throws Exception { } + } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index 94a9442caf98..744dae14cf95 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.general; import io.airbyte.config.ReplicationOutput; @@ -35,13 +39,12 @@ public static void main(String[] args) throws WorkerException, InterruptedExcept messageTracker, validator, metricReporter, - false - ); + false); AtomicReference output = new AtomicReference<>(); final Thread workerThread = new Thread(() -> { try { output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() - .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), + .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), Path.of("/"))); } catch (final WorkerException e) { throw new RuntimeException(e); @@ -51,8 +54,9 @@ public static void main(String[] args) throws WorkerException, InterruptedExcept workerThread.start(); workerThread.join(); var summary = output.get().getReplicationAttemptSummary(); - var mbRead = summary.getBytesSynced()/1_000_000; - var timeTakenSec = (summary.getEndTime() - summary.getStartTime())/1000.0; - log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead/timeTakenSec); + var mbRead = summary.getBytesSynced() / 1_000_000; + var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; + log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); } + } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java index 0f2e6b955c25..2666b64dfa8c 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.general; import io.airbyte.protocol.models.AirbyteMessage; @@ -15,4 +19,5 @@ public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) { public AirbyteMessage mapMessage(AirbyteMessage message) { return message; } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index ecf85595209d..52c28f3640f1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -48,7 +48,6 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -251,5 +250,5 @@ public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus( .recordsCommitted(databaseStatus.recordsCommitted().orElse(0L)) .hasNormalizationFailed(databaseStatus.normalizationFailed()); } - + } From 799eaaa5a5ffba095cc08b88c14a5aa50d3a3594 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 Jan 2023 21:07:10 -0800 Subject: [PATCH 06/14] Add dependencies for JMH testing. --- airbyte-commons-worker/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index 8051e8038d0d..d7ea7489ed04 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -38,6 +38,8 @@ dependencies { testImplementation libs.postgresql testImplementation libs.platform.testcontainers testImplementation libs.platform.testcontainers.postgresql + testImplementation 'org.openjdk.jmh:jmh-core:1.36' + testImplementation 'org.openjdk.jmh:jmh-generator-annprocess:1.36' testImplementation project(':airbyte-commons-docker') } From 58f50c76323a0ddceb14058c7338772bad42d85c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 Jan 2023 21:37:23 -0800 Subject: [PATCH 07/14] Checkpoint: JMH setup and run. However not 100% sure this works now. --- airbyte-commons-worker/build.gradle | 1 + .../general/DefaultReplicationWorker.java | 87 +++++++++++-------- .../workers/general/LimitedAirbyteSource.java | 4 +- .../ReplicationWorkerPerformanceTest.java | 18 +++- 4 files changed, 68 insertions(+), 42 deletions(-) diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index d7ea7489ed04..79388cd199d0 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -31,6 +31,7 @@ dependencies { testAnnotationProcessor platform(libs.micronaut.bom) testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor + testAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.36' testImplementation libs.bundles.micronaut.test testImplementation 'com.jayway.jsonpath:json-path:2.7.0' diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index daaf983ece65..45b1d7d958c3 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -318,44 +318,9 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { - final Optional messageOptional; - try { - messageOptional = source.attemptRead(); - } catch (final Exception e) { - throw new SourceException("Source process read attempt failed", e); - } - - if (messageOptional.isPresent()) { - final AirbyteMessage airbyteMessage = messageOptional.get(); - if (fieldSelectionEnabled) { - filterSelectedFields(streamToSelectedFields, airbyteMessage); - } - validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); - final AirbyteMessage message = mapper.mapMessage(airbyteMessage); - - messageTracker.acceptFromSource(message); - - try { - if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { - destination.accept(message); - } - } catch (final Exception e) { - throw new DestinationException("Destination process message delivery failed", e); - } - - recordsRead += 1; - - if (recordsRead % 1000 == 0) { - LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); - } - } else { - LOGGER.info("Source has no more messages, closing connection."); - try { - source.close(); - } catch (final Exception e) { - throw new SourceException("Source cannot be stopped!", e); - } - } + recordsRead = getRecordsRead(source, destination, mapper, messageTracker, recordSchemaValidator, fieldSelectionEnabled, recordsRead, + validationErrors, + streamToSelectedFields); } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); @@ -393,6 +358,52 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou }; } +// @Benchmark + public static long getRecordsRead(AirbyteSource source, AirbyteDestination destination, AirbyteMapper mapper, MessageTracker messageTracker, + RecordSchemaValidator recordSchemaValidator, boolean fieldSelectionEnabled, long recordsRead, + Map, Integer>> validationErrors, + Map> streamToSelectedFields) { + final Optional messageOptional; + try { + messageOptional = source.attemptRead(); + } catch (final Exception e) { + throw new SourceException("Source process read attempt failed", e); + } + + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + if (fieldSelectionEnabled) { + filterSelectedFields(streamToSelectedFields, airbyteMessage); + } + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); + + messageTracker.acceptFromSource(message); + + try { + if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { + destination.accept(message); + } + } catch (final Exception e) { + throw new DestinationException("Destination process message delivery failed", e); + } + + recordsRead += 1; + + if (recordsRead % 1000 == 0) { + LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); + } + } else { + LOGGER.info("Source has no more messages, closing connection."); + try { + source.close(); + } catch (final Exception e) { + throw new SourceException("Source cannot be stopped!", e); + } + } + return recordsRead; + } + private ReplicationOutput getReplicationOutput(final StandardSyncInput syncInput, final WorkerDestinationConfig destinationConfig, final AtomicReference replicationRunnableFailureRef, diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java index ac45dbc8d208..08e44e1af175 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -13,7 +13,7 @@ public class LimitedAirbyteSource implements AirbyteSource { - private static final int TOTAL_RECORDS = 10_000_000; + private static final int TOTAL_RECORDS = 3_00_000; private int currentRecords = 0; @@ -35,7 +35,7 @@ public int getExitValue() { @Override public Optional attemptRead() { currentRecords++; - return Optional.of(AirbyteMessageUtils.createRecordMessage("test stream", "data", + return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data", "This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance." + "Random append to prevent dead code generation: " + currentRecords)); } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index 744dae14cf95..7a7a44aac72a 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -8,29 +8,39 @@ import io.airbyte.config.StandardSyncInput; import io.airbyte.metrics.lib.NotImplementedMetricClient; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; +import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; @Slf4j public class ReplicationWorkerPerformanceTest { - public static void main(String[] args) throws WorkerException, InterruptedException { + @Benchmark @BenchmarkMode(Mode.SampleTime) + public void benchmarkOrchestrator() throws InterruptedException { var perSource = new LimitedAirbyteSource(); var perDestination = new EmptyAirbyteDestination(); var messageTracker = new AirbyteMessageTracker(); var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); var mapper = new StubAirbyteMapper(); - var validator = new RecordSchemaValidator(Map.of()); + var validator = new RecordSchemaValidator(Map.of( + new AirbyteStreamNameNamespacePair("s1", null), + CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING)))); var worker = new DefaultReplicationWorker("1", 0, perSource, @@ -59,4 +69,8 @@ public static void main(String[] args) throws WorkerException, InterruptedExcept log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); } + public static void main(String[] args) throws IOException { + org.openjdk.jmh.Main.main(args); + } + } From 46a03fa51b1c857c4541914eb1e56f7dd9f9cb3c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 11:33:18 -0800 Subject: [PATCH 08/14] Test custom annotations. --- .../general/DefaultReplicationWorker.java | 1 - .../workers/general/AirbyteBenchmark.java | 20 +++++++++++++++++++ .../workers/general/LimitedAirbyteSource.java | 2 +- .../ReplicationWorkerPerformanceTest.java | 5 ++--- 4 files changed, 23 insertions(+), 5 deletions(-) create mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 45b1d7d958c3..d0f084493949 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -358,7 +358,6 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou }; } -// @Benchmark public static long getRecordsRead(AirbyteSource source, AirbyteDestination destination, AirbyteMapper mapper, MessageTracker messageTracker, RecordSchemaValidator recordSchemaValidator, boolean fieldSelectionEnabled, long recordsRead, Map, Integer>> validationErrors, diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java new file mode 100644 index 000000000000..4f592d723319 --- /dev/null +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java @@ -0,0 +1,20 @@ +package io.airbyte.workers.general; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Warmup; + +@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.TYPE, ElementType.PARAMETER}) +@Retention(RetentionPolicy.RUNTIME) +@BenchmarkMode(Mode.SampleTime) +@Measurement(iterations = 2) +@Warmup(iterations = 0) +@Fork(value = 1) +public @interface AirbyteBenchmark {} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java index 08e44e1af175..cb892ee65947 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -13,7 +13,7 @@ public class LimitedAirbyteSource implements AirbyteSource { - private static final int TOTAL_RECORDS = 3_00_000; + private static final int TOTAL_RECORDS = 1_000_000; private int currentRecords = 0; diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index 7a7a44aac72a..f6ab918fed05 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -25,13 +25,12 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Mode; @Slf4j public class ReplicationWorkerPerformanceTest { - @Benchmark @BenchmarkMode(Mode.SampleTime) + // write better comment + @Benchmark @AirbyteBenchmark public void benchmarkOrchestrator() throws InterruptedException { var perSource = new LimitedAirbyteSource(); var perDestination = new EmptyAirbyteDestination(); From 0004daad207266d38924714634e49f81801c9dd8 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 14:06:42 -0800 Subject: [PATCH 09/14] Update comments. --- .../general/DefaultReplicationWorker.java | 86 ++++++++----------- .../workers/general/AirbyteBenchmark.java | 20 ----- .../workers/general/LimitedAirbyteSource.java | 4 + .../ReplicationWorkerPerformanceTest.java | 59 +++++++++---- .../workers/general/StubAirbyteMapper.java | 3 + 5 files changed, 89 insertions(+), 83 deletions(-) delete mode 100644 airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index d0f084493949..daaf983ece65 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -318,9 +318,44 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou try { // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { - recordsRead = getRecordsRead(source, destination, mapper, messageTracker, recordSchemaValidator, fieldSelectionEnabled, recordsRead, - validationErrors, - streamToSelectedFields); + final Optional messageOptional; + try { + messageOptional = source.attemptRead(); + } catch (final Exception e) { + throw new SourceException("Source process read attempt failed", e); + } + + if (messageOptional.isPresent()) { + final AirbyteMessage airbyteMessage = messageOptional.get(); + if (fieldSelectionEnabled) { + filterSelectedFields(streamToSelectedFields, airbyteMessage); + } + validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); + final AirbyteMessage message = mapper.mapMessage(airbyteMessage); + + messageTracker.acceptFromSource(message); + + try { + if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { + destination.accept(message); + } + } catch (final Exception e) { + throw new DestinationException("Destination process message delivery failed", e); + } + + recordsRead += 1; + + if (recordsRead % 1000 == 0) { + LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); + } + } else { + LOGGER.info("Source has no more messages, closing connection."); + try { + source.close(); + } catch (final Exception e) { + throw new SourceException("Source cannot be stopped!", e); + } + } } timeHolder.trackSourceReadEndTime(); LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); @@ -358,51 +393,6 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou }; } - public static long getRecordsRead(AirbyteSource source, AirbyteDestination destination, AirbyteMapper mapper, MessageTracker messageTracker, - RecordSchemaValidator recordSchemaValidator, boolean fieldSelectionEnabled, long recordsRead, - Map, Integer>> validationErrors, - Map> streamToSelectedFields) { - final Optional messageOptional; - try { - messageOptional = source.attemptRead(); - } catch (final Exception e) { - throw new SourceException("Source process read attempt failed", e); - } - - if (messageOptional.isPresent()) { - final AirbyteMessage airbyteMessage = messageOptional.get(); - if (fieldSelectionEnabled) { - filterSelectedFields(streamToSelectedFields, airbyteMessage); - } - validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); - final AirbyteMessage message = mapper.mapMessage(airbyteMessage); - - messageTracker.acceptFromSource(message); - - try { - if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { - destination.accept(message); - } - } catch (final Exception e) { - throw new DestinationException("Destination process message delivery failed", e); - } - - recordsRead += 1; - - if (recordsRead % 1000 == 0) { - LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted())); - } - } else { - LOGGER.info("Source has no more messages, closing connection."); - try { - source.close(); - } catch (final Exception e) { - throw new SourceException("Source cannot be stopped!", e); - } - } - return recordsRead; - } - private ReplicationOutput getReplicationOutput(final StandardSyncInput syncInput, final WorkerDestinationConfig destinationConfig, final AtomicReference replicationRunnableFailureRef, diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java deleted file mode 100644 index 4f592d723319..000000000000 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/AirbyteBenchmark.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.airbyte.workers.general; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.Warmup; - -@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.TYPE, ElementType.PARAMETER}) -@Retention(RetentionPolicy.RUNTIME) -@BenchmarkMode(Mode.SampleTime) -@Measurement(iterations = 2) -@Warmup(iterations = 0) -@Fork(value = 1) -public @interface AirbyteBenchmark {} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java index cb892ee65947..6a6ad64540e7 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/LimitedAirbyteSource.java @@ -11,6 +11,10 @@ import java.nio.file.Path; import java.util.Optional; +/** + * Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing. + * Intended for performance testing. + */ public class LimitedAirbyteSource implements AirbyteSource { private static final int TOTAL_RECORDS = 1_000_000; diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index f6ab918fed05..d983fbb25a94 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.general; +import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.StandardSyncInput; import io.airbyte.metrics.lib.NotImplementedMetricClient; @@ -17,6 +18,7 @@ import io.airbyte.workers.RecordSchemaValidator; import io.airbyte.workers.WorkerMetricReporter; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.NamespacingMapper; import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; import java.io.IOException; import java.nio.file.Path; @@ -25,31 +27,57 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Warmup; @Slf4j public class ReplicationWorkerPerformanceTest { - // write better comment - @Benchmark @AirbyteBenchmark - public void benchmarkOrchestrator() throws InterruptedException { - var perSource = new LimitedAirbyteSource(); - var perDestination = new EmptyAirbyteDestination(); - var messageTracker = new AirbyteMessageTracker(); - var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); - var mapper = new StubAirbyteMapper(); - var validator = new RecordSchemaValidator(Map.of( + /** + * Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source + * {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}. + *

+ * Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then + * reports a time distribution for the time taken to run the E2E sync. + *

+ * Because the reported time does not explicitly include throughput numbers, throughput logging has + * been added. This class is intended to help devs understand the impact of changes on throughput. + *

+ * To run this, simply run the main method and run the logs. + */ + @Benchmark + // SampleTime = the time taken to run the benchmarked method. Use this because we only care about + // the time taken to sync the entire dataset. + @BenchmarkMode(Mode.SampleTime) + // Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need + // that fine a result. + @Warmup(iterations = 0) + // How many runs to do. + @Fork(value = 1) + // Within each run, how many iterations to do. + @Measurement(iterations = 2) + public void executeOneSync() throws InterruptedException { + final var perSource = new LimitedAirbyteSource(); + final var perDestination = new EmptyAirbyteDestination(); + final var messageTracker = new AirbyteMessageTracker(); + final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); + final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", ""); + final var validator = new RecordSchemaValidator(Map.of( new AirbyteStreamNameNamespacePair("s1", null), CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING)))); - var worker = new DefaultReplicationWorker("1", 0, + final var worker = new DefaultReplicationWorker("1", 0, perSource, - mapper, + dstNamespaceMapper, perDestination, messageTracker, validator, metricReporter, false); - AtomicReference output = new AtomicReference<>(); + final AtomicReference output = new AtomicReference<>(); final Thread workerThread = new Thread(() -> { try { output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() @@ -62,13 +90,14 @@ public void benchmarkOrchestrator() throws InterruptedException { workerThread.start(); workerThread.join(); - var summary = output.get().getReplicationAttemptSummary(); - var mbRead = summary.getBytesSynced() / 1_000_000; - var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; + final var summary = output.get().getReplicationAttemptSummary(); + final var mbRead = summary.getBytesSynced() / 1_000_000; + final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); } public static void main(String[] args) throws IOException { + // Run this main class to start benchmarking. org.openjdk.jmh.Main.main(args); } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java index 2666b64dfa8c..d33634e5d9bc 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/StubAirbyteMapper.java @@ -8,6 +8,9 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.internal.AirbyteMapper; +/** + * Stub mapper testing what happens without any mapping. + */ public class StubAirbyteMapper implements AirbyteMapper { @Override From 6b4c27e179349415cfcfd28abb1aa2b4fef1bd5e Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 14:17:42 -0800 Subject: [PATCH 10/14] Move to dep.toml. --- airbyte-commons-worker/build.gradle | 6 +++--- deps.toml | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-commons-worker/build.gradle b/airbyte-commons-worker/build.gradle index 79388cd199d0..337afacd66c2 100644 --- a/airbyte-commons-worker/build.gradle +++ b/airbyte-commons-worker/build.gradle @@ -31,7 +31,7 @@ dependencies { testAnnotationProcessor platform(libs.micronaut.bom) testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor - testAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.36' + testAnnotationProcessor libs.jmh.annotations testImplementation libs.bundles.micronaut.test testImplementation 'com.jayway.jsonpath:json-path:2.7.0' @@ -39,8 +39,8 @@ dependencies { testImplementation libs.postgresql testImplementation libs.platform.testcontainers testImplementation libs.platform.testcontainers.postgresql - testImplementation 'org.openjdk.jmh:jmh-core:1.36' - testImplementation 'org.openjdk.jmh:jmh-generator-annprocess:1.36' + testImplementation libs.jmh.core + testImplementation libs.jmh.annotations testImplementation project(':airbyte-commons-docker') } diff --git a/deps.toml b/deps.toml index a663d065d620..d513376981ef 100644 --- a/deps.toml +++ b/deps.toml @@ -15,6 +15,7 @@ fasterxml_version = "2.14.0" flyway = "7.14.0" glassfish_version = "2.31" hikaricp = "5.0.1" +jmh = "1.36" jooq = "3.13.4" junit-jupiter = "5.9.1" log4j = "2.17.2" @@ -68,6 +69,8 @@ jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-j java-dogstatsd-client = { module = "com.datadoghq:java-dogstatsd-client", version = "4.1.0" } javax-databind = { module = "javax.xml.bind:jaxb-api", version = "2.4.0-b180830.0359" } jcl-over-slf4j = { module = "org.slf4j:jcl-over-slf4j", version.ref = "slf4j" } +jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" } +jmh-annotations = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" } jooq = { module = "org.jooq:jooq", version.ref = "jooq" } jooq-codegen = { module = "org.jooq:jooq-codegen", version.ref = "jooq" } jooq-meta = { module = "org.jooq:jooq-meta", version.ref = "jooq" } From 5cbb99a121dd19e8471b72ea5b576d6d67396afc Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 14:19:14 -0800 Subject: [PATCH 11/14] Clean up testing comments. --- .../io/airbyte/workers/general/DefaultReplicationWorker.java | 1 - .../airbyte/workers/general/DefaultReplicationWorkerTest.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index daaf983ece65..f699ff07e381 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -316,7 +316,6 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou populatedStreamToSelectedFields(catalog, streamToSelectedFields); } try { - // can this while be handled by a virtual thread too? while (!cancelled.get() && !source.isFinished()) { final Optional messageOptional; try { diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index d94ef0aeae88..d3b874f9e4ac 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -535,8 +535,6 @@ void testCancellation() throws InterruptedException { sleep(100); } - LOGGER.info("total records emitted: {}, total bytes emitted: {}", messageTracker.getTotalRecordsEmitted(), messageTracker.getTotalBytesEmitted()); - worker.cancel(); Assertions.assertTimeout(Duration.ofSeconds(5), (Executable) workerThread::join); assertNotNull(output.get()); From 89e60b5fcd9777973b0c2c60148a08821fa4d715 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 14:26:40 -0800 Subject: [PATCH 12/14] Better comments. --- .../workers/general/ReplicationWorkerPerformanceTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index d983fbb25a94..f163a3120c19 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -46,7 +46,8 @@ public class ReplicationWorkerPerformanceTest { * Because the reported time does not explicitly include throughput numbers, throughput logging has * been added. This class is intended to help devs understand the impact of changes on throughput. *

- * To run this, simply run the main method and run the logs. + * To use this, simply run the main method, make yourself a cup of coffee for 5 mins, then look the + * logs. */ @Benchmark // SampleTime = the time taken to run the benchmarked method. Use this because we only care about From ce90cc2bdf84ca3792629c2d0c09884e18b50345 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 15:02:31 -0800 Subject: [PATCH 13/14] Fix PMD. --- .../workers/general/ReplicationWorkerPerformanceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index f163a3120c19..13728602785c 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -57,7 +57,7 @@ public class ReplicationWorkerPerformanceTest { // that fine a result. @Warmup(iterations = 0) // How many runs to do. - @Fork(value = 1) + @Fork(1) // Within each run, how many iterations to do. @Measurement(iterations = 2) public void executeOneSync() throws InterruptedException { From 70506987971f08b89143f1cd7da4e0ddd171ad16 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 15:28:28 -0800 Subject: [PATCH 14/14] Ignore JMH generated code. --- spotbugs-exclude-filter-file.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spotbugs-exclude-filter-file.xml b/spotbugs-exclude-filter-file.xml index c5da06291781..129c0baf73c8 100644 --- a/spotbugs-exclude-filter-file.xml +++ b/spotbugs-exclude-filter-file.xml @@ -8,6 +8,9 @@ + + +