Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the dependency between the orchestrator container and the worker app #17570

Merged
merged 17 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.commons.temporal.sync;

import com.uber.m3.util.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down
55 changes: 55 additions & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-text:1.9'

implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor

testImplementation libs.bundles.micronaut.test
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'org.mockito:mockito-inline:4.7.0'
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers
testImplementation libs.platform.testcontainers.postgresql

testImplementation project(':airbyte-commons-docker')
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'
Copy link
Contributor

@jdpgrailsdev jdpgrailsdev Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct target package for these generated classes? If it is, should we use this change to rename it to better reflect that it is worker models and not persistence job models?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to limit the change of the PR so the package name are the same than the ones in the worker module. I'll included it in #17809. Does it sounds ok to you?


useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.workers;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.fabric8.kubernetes.client.KubernetesClient;

public record ContainerOrchestratorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers;

import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DefaultReplicationWorker;
import java.nio.file.Path;

public interface Worker<InputType, OutputType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.workers.*;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.storage;

import io.airbyte.commons.io.IOs;
import io.airbyte.workers.general.DocumentStoreClient;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;
package io.airbyte.workers.storage;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.google.cloud.storage.Storage;
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.DefaultGcsClientFactory;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import io.airbyte.config.storage.DefaultS3ClientFactory;
import io.airbyte.config.storage.MinioS3ClientFactory;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.storage;

import io.airbyte.config.storage.CloudStorageConfigs;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.file.Path;

public class StateClients {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import com.google.common.base.Stopwatch;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.internal;
package io.airbyte.workers.test_utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.test_utils;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
Expand Down
39 changes: 39 additions & 0 deletions airbyte-commons-worker/src/main/resources/image_exists.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash

set -e

function _error() {
echo "$@" && exit 1
}

function _usage() {
_error "Usage: ./image_exists.sh imageName"
}

function docker_exists_in_local() {
docker inspect --type=image "$1" 2> /dev/null | jq '. | length'
}

main() {
[[ $# -eq 1 ]] || _usage
imageName=$1

echo "Checking if ${imageName} exists..."
# handle the case where the image exists ONLY on the local machine.
LOCAL=$(docker_exists_in_local ${imageName})

if [[ $LOCAL -eq 0 ]]; then
echo "${imageName} not found locally. Attempting to pull the image..."
# handle the case where the image exists in the remote and either has never been pulled or has already been pulled
# and is already up to date.
RESULT=$(docker pull $imageName 2> /dev/null | awk '/Status: Image is up to date/ || /Status: Downloaded newer image/')
[ -z "$RESULT" ] && _error "Image does not exist."
echo "Pulled ${imageName} from remote."
else
echo "${imageName} was found locally."
fi

exit 0
}

main "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.internal.HeartbeatMonitor;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.TestConfigHelpers;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMessageTracker;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.StateDeltaTracker.StateDeltaTrackerException;
import io.airbyte.workers.internal.state_aggregator.StateAggregator;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

package io.airbyte.workers.internal;

import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.json.Jsons;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.TestConfigHelpers;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import org.junit.jupiter.api.Test;

class NamespacingMapperTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerOomException;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.jupiter.api.BeforeEach;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-workers')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'org.mockito:mockito-inline:2.13.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.logging.LoggingHelper;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
Expand All @@ -23,10 +24,9 @@
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.sync.DbtLauncherWorker;
import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker;
import io.airbyte.workers.temporal.sync.OrchestratorConstants;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import io.airbyte.workers.sync.DbtLauncherWorker;
import io.airbyte.workers.sync.NormalizationLauncherWorker;
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
Expand Down
Loading