Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version Normalization as if it were part of core #1208

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ jobs:
run: |
docker login -u airbytebot -p ${DOCKER_PASSWORD}
docker-compose -f docker-compose.build.yaml push
docker-compose -f docker-compose.normalization.build.yaml push
env:
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}

Expand Down
3 changes: 0 additions & 3 deletions airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,3 @@ COPY dbt-project-template/ ./dbt-template/
WORKDIR /airbyte

ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/normalization
1 change: 0 additions & 1 deletion airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
plugins {
id 'airbyte-docker'
id 'airbyte-python'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void runSync(JsonNode config, List<AirbyteMessage> messages, ConfiguredA
return;
}

final NormalizationRunner runner = NormalizationRunnerFactory.create(
final NormalizationRunner runner = new NormalizationRunnerFactory("dev").create(
getImageName(),
pbf, targetConfig.getDestinationConnectionConfiguration());
runner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ dependencies {
implementation project(':airbyte-queue')

integrationTestImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
// depends on normalization docker image creation, which is handled in composeBuild.
integrationTestImplementation files(task('composeBuild').outputs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately i think this is pretty intolerable. i think it pushes me far enough to think that we should scrap this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed because now the only way that the normalization image gets built is by composeBuild instead of :airbyte-integrations:bases:base-normalization:airbyteDocker. We could keep the airbyteDocker task as well for normalization, but then we'd have 2 sources of truth for how the container gets built which also seems really terrible.


implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ dependencies {
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
// depends on normalization docker image creation, which is handled in composeBuild.
integrationTestImplementation files(task('composeBuild').outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ dependencies {
integrationTestImplementation "org.testcontainers:postgresql:1.15.0-rc2"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
// depends on normalization docker image creation, which is handled in composeBuild.
integrationTestImplementation files(task('composeBuild').outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ dependencies {
integrationTestImplementation project(':airbyte-integrations:bases:standard-destination-test')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
// depends on normalization docker image creation, which is handled in composeBuild.
integrationTestImplementation files(task('composeBuild').outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ dependencies {

integrationTestImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestImplementation project(':airbyte-integrations:connectors:destination-snowflake')
integrationTestImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
// depends on normalization docker image creation, which is handled in composeBuild.
integrationTestImplementation files(task('composeBuild').outputs)
integrationTestImplementation 'org.apache.commons:commons-lang3:3.11'

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.db.Databases;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;
Expand Down Expand Up @@ -68,21 +69,24 @@ public class SchedulerApp {
private final ProcessBuilderFactory pbf;
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final String airbyteVersion;

public SchedulerApp(Path workspaceRoot,
ProcessBuilderFactory pbf,
JobPersistence jobPersistence,
ConfigRepository configRepository) {
ConfigRepository configRepository,
String airbyteVersion) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.airbyteVersion = airbyteVersion;
}

public void start() {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf);
final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf, new NormalizationRunnerFactory(airbyteVersion));

final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
Expand Down Expand Up @@ -129,7 +133,7 @@ public static void main(String[] args) {
TrackingClientSingleton.initialize(configs.getTrackingStrategy(), configs.getAirbyteVersion(), configRepository);

LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository).start();
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, configs.getAirbyteVersion()).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ public class WorkerRunFactory {
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
private final Creator creator;
private final NormalizationRunnerFactory normalizationRunnerFactory;

public WorkerRunFactory(final Path workspaceRoot,
final ProcessBuilderFactory pbf) {
this(workspaceRoot, pbf, WorkerRun::new);
final ProcessBuilderFactory pbf,
final NormalizationRunnerFactory normalizationRunnerFactory) {
this(workspaceRoot, pbf, WorkerRun::new, normalizationRunnerFactory);
}

WorkerRunFactory(final Path workspaceRoot,
final ProcessBuilderFactory pbf,
final Creator creator) {
final Creator creator,
final NormalizationRunnerFactory normalizationRunnerFactory) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.creator = creator;
this.normalizationRunnerFactory = normalizationRunnerFactory;
}

public WorkerRun create(final Job job) {
Expand Down Expand Up @@ -141,7 +145,7 @@ private WorkerRun createSyncWorker(JobSyncConfig config, Path jobRoot) {
new DefaultAirbyteSource(sourceLauncher),
new DefaultAirbyteDestination(destinationLauncher),
new AirbyteMessageTracker(),
NormalizationRunnerFactory.create(
normalizationRunnerFactory.create(
config.getDestinationDockerImage(),
pbf,
syncInput.getDestinationConnection().getConfiguration()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.Worker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker;
import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker;
Expand All @@ -65,6 +66,8 @@ class WorkerRunFactoryTest {
private WorkerRunFactory.Creator creator;

private WorkerRunFactory factory;
private ProcessBuilderFactory pbf;
private NormalizationRunnerFactory normalizationRunnerFactory;

@BeforeEach
void setUp() throws IOException {
Expand All @@ -74,9 +77,11 @@ void setUp() throws IOException {
when(job.getConfig().getSync().getDestinationDockerImage()).thenReturn("airbyte/destination-moon:0.1.0");

creator = mock(WorkerRunFactory.Creator.class);
pbf = mock(ProcessBuilderFactory.class);
normalizationRunnerFactory = mock(NormalizationRunnerFactory.class);
rootPath = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");

factory = new WorkerRunFactory(rootPath, mock(ProcessBuilderFactory.class), creator);
factory = new WorkerRunFactory(rootPath, pbf, creator, normalizationRunnerFactory);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -114,18 +119,21 @@ void testSchema() {
void testSync() {
when(job.getConfig().getConfigType()).thenReturn(JobConfig.ConfigType.SYNC);

factory.create(job);

StandardSyncInput expectedInput = new StandardSyncInput()
final StandardSyncInput expectedInput = new StandardSyncInput()
.withSourceConnection(job.getConfig().getSync().getSourceConnection())
.withDestinationConnection(job.getConfig().getSync().getDestinationConnection())
.withCatalog(AirbyteProtocolConverters.toConfiguredCatalog(job.getConfig().getSync().getStandardSync().getSchema()))
.withConnectionId(job.getConfig().getSync().getStandardSync().getConnectionId())
.withSyncMode(job.getConfig().getSync().getStandardSync().getSyncMode())
.withState(job.getConfig().getSync().getState());

factory.create(job);

ArgumentCaptor<Worker<StandardSyncInput, JobOutput>> argument = ArgumentCaptor.forClass(Worker.class);
verify(creator).create(eq(rootPath.resolve("1").resolve("2")), eq(expectedInput), argument.capture());
verify(normalizationRunnerFactory)
.create(job.getConfig().getSync().getDestinationDockerImage(), pbf, expectedInput.getDestinationConnection().getConfiguration());

Assertions.assertTrue(argument.getValue() instanceof JobOutputSyncWorker);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:dev";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization";

private final DestinationType destinationType;
private final ProcessBuilderFactory pbf;
private final String version;

private Process process = null;

Expand All @@ -57,9 +58,10 @@ public enum DestinationType {
SNOWFLAKE
}

public DefaultNormalizationRunner(final DestinationType destinationType, final ProcessBuilderFactory pbf) {
public DefaultNormalizationRunner(final DestinationType destinationType, final ProcessBuilderFactory pbf, final String version) {
this.destinationType = destinationType;
this.pbf = pbf;
this.version = version;
}

@Override
Expand All @@ -68,7 +70,7 @@ public boolean normalize(Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog
IOs.writeFile(jobRoot, WorkerConstants.CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

try {
process = pbf.create(jobRoot, NORMALIZATION_IMAGE_NAME, "run",
process = pbf.create(jobRoot, NORMALIZATION_IMAGE_NAME + ":" + version, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.TARGET_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.CATALOG_JSON_FILENAME).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@ public class NormalizationRunnerFactory {
.put("airbyte/destination-snowflake", DefaultNormalizationRunner.DestinationType.SNOWFLAKE)
.build();

public static NormalizationRunner create(String imageName, ProcessBuilderFactory pbf, JsonNode config) {
private final String normalizationVersion;

public NormalizationRunnerFactory(String normalizationVersion) {
this.normalizationVersion = normalizationVersion;
}

public NormalizationRunner create(String imageName, ProcessBuilderFactory pbf, JsonNode config) {
if (!shouldNormalize(config)) {
return new NoOpNormalizationRunner();
}

final String imageNameWithoutTag = imageName.split(":")[0];

if (NORMALIZATION_MAPPING.containsKey(imageNameWithoutTag)) {
return new DefaultNormalizationRunner(NORMALIZATION_MAPPING.get(imageNameWithoutTag), pbf);
return new DefaultNormalizationRunner(NORMALIZATION_MAPPING.get(imageNameWithoutTag), pbf, normalizationVersion);
} else {
throw new IllegalStateException(
String.format("Requested normalization for %s, but it is not included in the normalization mapping.", imageName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

class DefaultNormalizationRunnerTest {

private static final String NORMALIZATION_IMAGE_VERSION = "organic";

private Path jobRoot;
private ProcessBuilderFactory pbf;
private Process process;
Expand All @@ -61,7 +63,7 @@ void setup() throws IOException, WorkerException {
config = mock(JsonNode.class);
catalog = mock(ConfiguredAirbyteCatalog.class);

when(pbf.create(jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, "run",
when(pbf.create(jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME + ":" + NORMALIZATION_IMAGE_VERSION, "run",
"--integration-type", "bigquery",
"--config", WorkerConstants.TARGET_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.CATALOG_JSON_FILENAME))
Expand All @@ -73,7 +75,7 @@ void setup() throws IOException, WorkerException {

@Test
void test() throws Exception {
final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf);
final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf, NORMALIZATION_IMAGE_VERSION);

when(process.exitValue()).thenReturn(0);

Expand All @@ -84,7 +86,7 @@ void test() throws Exception {
public void testClose() throws Exception {
when(process.isAlive()).thenReturn(true).thenReturn(false);

final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf);
final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf, NORMALIZATION_IMAGE_VERSION);
runner.normalize(jobRoot, config, catalog);
runner.close();

Expand All @@ -95,7 +97,7 @@ public void testClose() throws Exception {
public void testFailure() {
doThrow(new RuntimeException()).when(process).exitValue();

final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf);
final NormalizationRunner runner = new DefaultNormalizationRunner(DestinationType.BIGQUERY, pbf, NORMALIZATION_IMAGE_VERSION);
assertThrows(RuntimeException.class, () -> runner.normalize(jobRoot, config, catalog));

verify(process).destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,32 @@ class NormalizationRunnerFactoryTest {

private static final JsonNode CONFIG_WITH_NORMALIZATION = Jsons.jsonNode(ImmutableMap.of("basic_normalization", true));
private ProcessBuilderFactory pbf;
private NormalizationRunnerFactory normalizationRunnerFactory;

@BeforeEach
void setup() {
pbf = mock(ProcessBuilderFactory.class);
normalizationRunnerFactory = new NormalizationRunnerFactory("organic");
}

@Test
void testMappings() {
assertEquals(DestinationType.BIGQUERY,
((DefaultNormalizationRunner) NormalizationRunnerFactory.create(
((DefaultNormalizationRunner) normalizationRunnerFactory.create(
"airbyte/destination-bigquery:0.1.0", pbf, CONFIG_WITH_NORMALIZATION)).getDestinationType());
assertEquals(DestinationType.POSTGRES,
((DefaultNormalizationRunner) NormalizationRunnerFactory.create(
((DefaultNormalizationRunner) normalizationRunnerFactory.create(
"airbyte/destination-postgres:0.1.0", pbf, CONFIG_WITH_NORMALIZATION)).getDestinationType());
assertEquals(DestinationType.SNOWFLAKE,
((DefaultNormalizationRunner) NormalizationRunnerFactory.create(
((DefaultNormalizationRunner) normalizationRunnerFactory.create(
"airbyte/destination-snowflake:0.1.0", pbf, CONFIG_WITH_NORMALIZATION)).getDestinationType());
assertThrows(IllegalStateException.class,
() -> NormalizationRunnerFactory.create("airbyte/destination-csv:0.1.0", pbf, CONFIG_WITH_NORMALIZATION));
() -> normalizationRunnerFactory.create("airbyte/destination-csv:0.1.0", pbf, CONFIG_WITH_NORMALIZATION));
}

@Test
void testShouldNotNormalize() {
assertTrue(NormalizationRunnerFactory.create("airbyte/destination-bigquery:0.1.0", pbf,
assertTrue(normalizationRunnerFactory.create("airbyte/destination-bigquery:0.1.0", pbf,
Jsons.jsonNode(Collections.emptyMap())) instanceof NoOpNormalizationRunner);
}

Expand Down
19 changes: 12 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,20 @@ allprojects {
apply plugin: 'base'

afterEvaluate { project ->
// format is '{project.group}.{project.name}'.
// note: project.group returns a period delimited the fully qualified package name.
def composeDeps = [
"airbyte-config:init",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything that had a : wasn't working in this because project.name just returns the name no prefix (e.g. init).

"airbyte-db",
"airbyte-scheduler",
"airbyte-server",
"airbyte-webapp",
'io.airbyte.airbyte-config.init',
'io.airbyte.airbyte-db',
'io.airbyte.airbyte-scheduler',
'io.airbyte.airbyte-server',
'io.airbyte.airbyte-webapp',
'io.airbyte.airbyte-integrations.bases.base-normalization'
].toSet().asImmutable()

if (project.name in composeDeps) {

def fullProjectName = (project.group + "." + project.name).toString()
if (fullProjectName in composeDeps) {
composeBuild.dependsOn(project.tasks.assemble)
}
}
Expand Down Expand Up @@ -194,7 +199,7 @@ task composeBuild {
doFirst {
exec {
workingDir rootDir
commandLine 'docker-compose', '-f', 'docker-compose.build.yaml', 'build', '--parallel', '--quiet'
commandLine 'docker-compose', '-f', 'docker-compose.build.yaml', '-f', 'docker-compose.normalization.build.yaml', 'build', '--parallel', '--quiet'
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.normalization.build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.7"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is still valuable to leverage docker-compose here to guarantee we get the same env variables as the rest of the core app. but ultimately we want the config to be separate because the normalization image is not run as part of compose up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep in mind we will kill this normalization container at some point too and roll it into the destination, so this will hopefully be short-ish lived. but definitely around for several weeks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Normalization has the same lifecycle as the rest of core, it shouldn't be separate. I don't see docker-compose.build.yaml as something that tells us what to run but what to build for core. So it is ok if it is declared as a service.

Having every "Core" artifacts listed in the same place will save us a lot of headache or incomplete releases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normalization does not have the same lifecycle as the rest for core. All of core is run on docker-compose up. Normalization is not. Normalization is run when it is invoked from the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add it to docker-compose.yaml docker-compose up will fail it. It is not meant to run at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think michel is suggesting adding it to the docker-compose.build.yaml file not docker-compose.yaml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline and agreed to keep it separate. the issue with adding to the docker-compose.build.yaml file is that when we push to docker hub to release we rely on both docker-compose.yaml AND docker-compose.build.yaml. They stack on each other, so docker-compose.yaml is part of the build / release process, but it is also how the containers get run. Agreed for now to keep it separate. But we all think it is sad. 😭


services:
normalization:
image: airbyte/normalization:dev
build:
dockerfile: Dockerfile
context: airbyte-integrations/bases/base-normalization
labels:
io.airbyte.git-revision: ${GIT_REVISION}
Loading