-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Default Replication Worker Performance Test Harness #20956
Changes from all commits
b276b57
097ed19
4ee048f
bec9d04
a05ab50
c508e43
799eaaa
58f50c7
46a03fa
3c3086f
eecceb8
0004daa
6b4c27e
a09c219
b1ac9a9
5cbb99a
89e60b5
ce90cc2
7050698
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.general; | ||
|
||
import io.airbyte.config.WorkerDestinationConfig; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.workers.internal.AirbyteDestination; | ||
import java.nio.file.Path; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Empty Airbyte Destination. Does nothing with messages. Intended for performance testing. | ||
*/ | ||
public class EmptyAirbyteDestination implements AirbyteDestination { | ||
|
||
@Override | ||
public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public void accept(AirbyteMessage message) throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public void notifyEndOfInput() throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public boolean isFinished() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public int getExitValue() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public Optional<AirbyteMessage> attemptRead() { | ||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public void close() throws Exception {} | ||
|
||
@Override | ||
public void cancel() throws Exception { | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.general; | ||
|
||
import io.airbyte.config.WorkerSourceConfig; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.workers.internal.AirbyteSource; | ||
import io.airbyte.workers.test_utils.AirbyteMessageUtils; | ||
import java.nio.file.Path; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing. | ||
* Intended for performance testing. | ||
*/ | ||
public class LimitedAirbyteSource implements AirbyteSource { | ||
|
||
private static final int TOTAL_RECORDS = 1_000_000; | ||
|
||
private int currentRecords = 0; | ||
|
||
@Override | ||
public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public boolean isFinished() { | ||
return currentRecords == TOTAL_RECORDS; | ||
} | ||
|
||
@Override | ||
public int getExitValue() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public Optional<AirbyteMessage> attemptRead() { | ||
currentRecords++; | ||
return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data", | ||
"This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance." | ||
+ "Random append to prevent dead code generation: " + currentRecords)); | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
|
||
} | ||
|
||
@Override | ||
public void cancel() throws Exception { | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.general; | ||
|
||
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; | ||
import io.airbyte.config.ReplicationOutput; | ||
import io.airbyte.config.StandardSyncInput; | ||
import io.airbyte.metrics.lib.NotImplementedMetricClient; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; | ||
import io.airbyte.protocol.models.CatalogHelpers; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteStream; | ||
import io.airbyte.protocol.models.JsonSchemaType; | ||
import io.airbyte.protocol.models.SyncMode; | ||
import io.airbyte.workers.RecordSchemaValidator; | ||
import io.airbyte.workers.WorkerMetricReporter; | ||
import io.airbyte.workers.exception.WorkerException; | ||
import io.airbyte.workers.internal.NamespacingMapper; | ||
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker; | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Fork; | ||
import org.openjdk.jmh.annotations.Measurement; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.Warmup; | ||
|
||
@Slf4j | ||
public class ReplicationWorkerPerformanceTest { | ||
|
||
/** | ||
* Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source | ||
* {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}. | ||
* <p> | ||
* Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then | ||
* reports a time distribution for the time taken to run the E2E sync. | ||
* <p> | ||
* Because the reported time does not explicitly include throughput numbers, throughput logging has | ||
* been added. This class is intended to help devs understand the impact of changes on throughput. | ||
* <p> | ||
* To use this, simply run the main method, make yourself a cup of coffee for 5 mins, then look the | ||
* logs. | ||
*/ | ||
@Benchmark | ||
// SampleTime = the time taken to run the benchmarked method. Use this because we only care about | ||
// the time taken to sync the entire dataset. | ||
@BenchmarkMode(Mode.SampleTime) | ||
// Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need | ||
// that fine a result. | ||
@Warmup(iterations = 0) | ||
// How many runs to do. | ||
@Fork(1) | ||
// Within each run, how many iterations to do. | ||
@Measurement(iterations = 2) | ||
public void executeOneSync() throws InterruptedException { | ||
final var perSource = new LimitedAirbyteSource(); | ||
final var perDestination = new EmptyAirbyteDestination(); | ||
final var messageTracker = new AirbyteMessageTracker(); | ||
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); | ||
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", ""); | ||
final var validator = new RecordSchemaValidator(Map.of( | ||
new AirbyteStreamNameNamespacePair("s1", null), | ||
CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING)))); | ||
|
||
final var worker = new DefaultReplicationWorker("1", 0, | ||
perSource, | ||
dstNamespaceMapper, | ||
perDestination, | ||
messageTracker, | ||
validator, | ||
metricReporter, | ||
false); | ||
final AtomicReference<ReplicationOutput> output = new AtomicReference<>(); | ||
final Thread workerThread = new Thread(() -> { | ||
try { | ||
output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog() | ||
.withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))), | ||
Path.of("/"))); | ||
} catch (final WorkerException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
|
||
workerThread.start(); | ||
workerThread.join(); | ||
Comment on lines
+92
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this test in the same process, but using different threads? If that's the case, then this also bypasses the stdio streams as well, which also present their own bottlenecks (cc @colesnodgrass) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. Let me see if I can quickly work something up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does show that the core serialising isn't the bottleneck like we thought. |
||
final var summary = output.get().getReplicationAttemptSummary(); | ||
final var mbRead = summary.getBytesSynced() / 1_000_000; | ||
final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0; | ||
log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec); | ||
} | ||
|
||
public static void main(String[] args) throws IOException { | ||
// Run this main class to start benchmarking. | ||
org.openjdk.jmh.Main.main(args); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.general; | ||
|
||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.workers.internal.AirbyteMapper; | ||
|
||
/** | ||
* Stub mapper testing what happens without any mapping. | ||
*/ | ||
public class StubAirbyteMapper implements AirbyteMapper { | ||
davinchia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Override | ||
public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) { | ||
return null; | ||
} | ||
|
||
@Override | ||
public AirbyteMessage mapMessage(AirbyteMessage message) { | ||
return message; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@colesnodgrass unfortunately because these annotations don't have the target annotation type, we cannot fold them into a meta annotation. I think this is fine for now.