diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 33c7e1e8497a..142014a34e99 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -211,6 +211,13 @@ public interface Configs { */ int getSyncJobMaxTimeoutDays(); + /** + * Defines whether job creation uses connector-specific resource requirements when spawning jobs. + * Works on both Docker and Kubernetes. Defaults to false for ease of use in OSS trials of Airbyte + * but recommended for production deployments. + */ + boolean connectorSpecificResourceDefaultsEnabled(); + /** * Define the job container's minimum CPU usage. Units follow either Docker or Kubernetes, depending * on the deployment. Defaults to none. diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index fe80f65856ad..e80edff0f689 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -64,6 +64,7 @@ public class EnvConfigs implements Configs { public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE"; public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS"; public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS"; + private static final String CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED = "CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED"; private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB"; @@ -419,6 +420,11 @@ public int getSyncJobMaxTimeoutDays() { return Integer.parseInt(getEnvOrDefault(SYNC_JOB_MAX_TIMEOUT_DAYS, "3")); } + @Override + public boolean connectorSpecificResourceDefaultsEnabled() { + return getEnvOrDefault(CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED, false); + } + /** * Returns worker pod tolerations parsed from its own environment variable. The value of the env is * a string that represents one or more tolerations. diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java index f5ec455ecd24..581565e4876d 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java @@ -47,7 +47,8 @@ public class JobScheduler implements Runnable { this.jobFactory = jobFactory; } - public JobScheduler(final JobPersistence jobPersistence, + public JobScheduler(final boolean connectorSpecificResourceDefaultsEnabled, + final JobPersistence jobPersistence, final ConfigRepository configRepository, final TrackingClient trackingClient, final WorkerConfigs workerConfigs) { @@ -56,6 +57,7 @@ public JobScheduler(final JobPersistence jobPersistence, configRepository, new ScheduleJobPredicate(Instant::now), new DefaultSyncJobFactory( + connectorSpecificResourceDefaultsEnabled, new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()), configRepository, new OAuthConfigSupplier(configRepository, trackingClient))); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index ebfb0acbcb54..84e0ecf87b90 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -131,7 +131,12 @@ public void start() throws IOException { featureFlags); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts); final TrackingClient trackingClient = TrackingClientSingleton.get(); - final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient, workerConfigs); + final JobScheduler jobScheduler = new JobScheduler( + configs.connectorSpecificResourceDefaultsEnabled(), + jobPersistence, + configRepository, + trackingClient, + workerConfigs); final JobSubmitter jobSubmitter = new JobSubmitter( workerThreadPool, jobPersistence, diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java index 42e3d2d9823a..37dc4eb70abd 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java @@ -23,10 +23,14 @@ public class DefaultSchedulerJobClient implements SchedulerJobClient { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSchedulerJobClient.class); + private final boolean connectorSpecificResourceDefaultsEnabled; private final JobPersistence jobPersistence; private final JobCreator jobCreator; - public DefaultSchedulerJobClient(final JobPersistence jobPersistence, final JobCreator jobCreator) { + public DefaultSchedulerJobClient(final boolean connectorSpecificResourceDefaultsEnabled, + final JobPersistence jobPersistence, + final JobCreator jobCreator) { + this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled; this.jobPersistence = jobPersistence; this.jobCreator = jobCreator; } @@ -38,9 +42,19 @@ public Job createOrGetActiveSyncJob(final SourceConnection source, final String sourceDockerImage, final String destinationDockerImage, final List standardSyncOperations, - @Nullable final ActorDefinitionResourceRequirements sourceResourceRequirements, - @Nullable final ActorDefinitionResourceRequirements destinationResourceRequirements) + @Nullable final ActorDefinitionResourceRequirements ignorableSourceResourceRequirements, + @Nullable final ActorDefinitionResourceRequirements ignorableDestinationResourceRequirements) throws IOException { + + ActorDefinitionResourceRequirements sourceResourceRequirements = ignorableSourceResourceRequirements; + ActorDefinitionResourceRequirements destinationResourceRequirements = ignorableDestinationResourceRequirements; + + // for OSS users, make it possible to ignore default actor-level resource requirements + if (!connectorSpecificResourceDefaultsEnabled) { + sourceResourceRequirements = null; + destinationResourceRequirements = null; + } + final Optional jobIdOptional = jobCreator.createSyncJob( source, destination, diff --git a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java index 172bf77b2907..b5054abac836 100644 --- a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java +++ b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java @@ -37,7 +37,7 @@ void setup() { jobPersistence = mock(JobPersistence.class); jobCreator = mock(JobCreator.class); job = mock(Job.class); - client = spy(new DefaultSchedulerJobClient(jobPersistence, jobCreator)); + client = spy(new DefaultSchedulerJobClient(true, jobPersistence, jobCreator)); } @Test diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java index 3aeb545647c4..13b6b5e2310a 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; @@ -23,13 +24,16 @@ public class DefaultSyncJobFactory implements SyncJobFactory { + private final boolean connectorSpecificResourceDefaultsEnabled; private final DefaultJobCreator jobCreator; private final ConfigRepository configRepository; private final OAuthConfigSupplier oAuthConfigSupplier; - public DefaultSyncJobFactory(final DefaultJobCreator jobCreator, + public DefaultSyncJobFactory(final boolean connectorSpecificResourceDefaultsEnabled, + final DefaultJobCreator jobCreator, final ConfigRepository configRepository, final OAuthConfigSupplier oAuthConfigSupplier) { + this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled; this.jobCreator = jobCreator; this.configRepository = configRepository; this.oAuthConfigSupplier = oAuthConfigSupplier; @@ -65,6 +69,15 @@ public Long create(final UUID connectionId) { standardSyncOperations.add(standardSyncOperation); } + ActorDefinitionResourceRequirements sourceResourceRequirements = sourceDefinition.getResourceRequirements(); + ActorDefinitionResourceRequirements destinationResourceRequirements = destinationDefinition.getResourceRequirements(); + + // for OSS users, make it possible to ignore default actor-level resource requirements + if (!connectorSpecificResourceDefaultsEnabled) { + sourceResourceRequirements = null; + destinationResourceRequirements = null; + } + return jobCreator.createSyncJob( sourceConnection, destinationConnection, @@ -72,8 +85,8 @@ public Long create(final UUID connectionId) { sourceImageName, destinationImageName, standardSyncOperations, - sourceDefinition.getResourceRequirements(), - destinationDefinition.getResourceRequirements()) + sourceResourceRequirements, + destinationResourceRequirements) .orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already.")); } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java index bc263b2b0ac7..fc800a211319 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java @@ -72,7 +72,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo .thenReturn(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinitionId).withDockerRepository(dstDockerRepo) .withDockerImageTag(dstDockerTag)); - final SyncJobFactory factory = new DefaultSyncJobFactory(jobCreator, configRepository, mock(OAuthConfigSupplier.class)); + final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class)); final long actualJobId = factory.create(connectionId); assertEquals(jobId, actualJobId); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index dd1972100ebc..3f7128e233c7 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -186,7 +186,9 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient); final SchedulerJobClient schedulerJobClient = - new DefaultSchedulerJobClient(jobPersistence, + new DefaultSchedulerJobClient( + configs.connectorSpecificResourceDefaultsEnabled(), + jobPersistence, new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements())); final DefaultSynchronousSchedulerClient syncSchedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 22a4444445b2..5b59aa48f4b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -386,6 +386,7 @@ private static void launchWorkerApp() throws IOException { configRepository); final TrackingClient trackingClient = TrackingClientSingleton.get(); final SyncJobFactory jobFactory = new DefaultSyncJobFactory( + configs.connectorSpecificResourceDefaultsEnabled(), new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()), configRepository, new OAuthConfigSupplier(configRepository, trackingClient)); diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 19e72c26e609..d7b0ae1d41c6 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -69,3 +69,4 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=false +CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true diff --git a/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml b/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml index 2d7047970e02..6218fae28e75 100644 --- a/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml +++ b/kube/overlays/stable-with-resource-limits/set-resource-limits.yaml @@ -21,6 +21,12 @@ spec: spec: containers: - name: airbyte-scheduler-container + env: + - name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED resources: limits: cpu: 2 @@ -35,6 +41,12 @@ spec: spec: containers: - name: airbyte-worker-container + env: + - name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED resources: limits: cpu: 2 @@ -49,6 +61,12 @@ spec: spec: containers: - name: airbyte-server-container + env: + - name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED resources: limits: cpu: 1