Skip to content

Commit

Permalink
Set resource limits for connector definitions: api layer (#10482)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Feb 23, 2022
1 parent 1f2dbc8 commit 8078302
Show file tree
Hide file tree
Showing 13 changed files with 646 additions and 54 deletions.
46 changes: 46 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2124,6 +2124,8 @@ components:
format: uri
icon:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionUpdate:
type: object
description: Update the SourceDefinition. Currently, the only allowed attribute to update is the default docker image version.
Expand All @@ -2135,6 +2137,8 @@ components:
$ref: "#/components/schemas/SourceDefinitionId"
dockerImageTag:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionRead:
type: object
required:
Expand Down Expand Up @@ -2162,6 +2166,8 @@ components:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
SourceDefinitionReadList:
type: object
required:
Expand Down Expand Up @@ -2393,6 +2399,8 @@ components:
format: uri
icon:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionUpdate:
type: object
required:
Expand All @@ -2403,6 +2411,8 @@ components:
$ref: "#/components/schemas/DestinationDefinitionId"
dockerImageTag:
type: string
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionRead:
type: object
required:
Expand Down Expand Up @@ -2431,6 +2441,8 @@ components:
description: The date when this connector was first released, in yyyy-mm-dd format.
type: string
format: date
resourceRequirements:
$ref: "#/components/schemas/ActorDefinitionResourceRequirements"
DestinationDefinitionReadList:
type: object
required:
Expand Down Expand Up @@ -3475,6 +3487,40 @@ components:
$ref: "#/components/schemas/ConnectionStateObject"
ConnectionStateObject:
type: object
ActorDefinitionResourceRequirements:
description: actor definition specific resource requirements. if default is set, these are the requirements that should be set for ALL jobs run for this actor definition. it is overriden by the job type specific configurations. if not set, the platform will use defaults. these values will be overriden by configuration at the connection level.
type: object
additionalProperties: false
properties:
default:
"$ref": "#/definitions/ResourceRequirements"
jobSpecific:
type: array
items:
"$ref": "#/definitions/JobTypeResourceLimit"
JobTypeResourceLimit:
description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
type: object
additionalProperties: false
required:
- jobType
- resourceRequirements
properties:
jobType:
"$ref": "#/definitions/JobType"
resourceRequirements:
"$ref": "#/definitions/ResourceRequirements"
JobType:
description: enum that describes the different types of jobs that the platform runs.
type: string
enum:
- get_spec
- check_connection
- discover_schema
- sync
- reset_connection
- connection_updater
- replicate
ResourceRequirements:
description: optional resource requirements to run workers (blank for unbounded allocations)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.AirbyteConfigValidator;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -57,15 +58,19 @@ private YamlSeedConfigPersistence(final Class<?> seedResourceClass) throws IOExc
final Map<String, JsonNode> fullSourceDefinitionConfigs = sourceDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
return mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs);
final JsonNode output = mergeSpecIntoDefinition(withTombstone, sourceSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_SOURCE_DEFINITION, output);
return output;
}));

final Map<String, JsonNode> destinationDefinitionConfigs = getConfigs(seedResourceClass, SeedType.STANDARD_DESTINATION_DEFINITION);
final Map<String, JsonNode> destinationSpecConfigs = getConfigs(seedResourceClass, SeedType.DESTINATION_SPEC);
final Map<String, JsonNode> fullDestinationDefinitionConfigs = destinationDefinitionConfigs.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, e -> {
final JsonNode withTombstone = addMissingTombstoneField(e.getValue());
return mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs);
final JsonNode output = mergeSpecIntoDefinition(withTombstone, destinationSpecConfigs);
AirbyteConfigValidator.AIRBYTE_CONFIG_VALIDATOR.ensureAsRuntime(ConfigSchema.STANDARD_DESTINATION_DEFINITION, output);
return output;
}));

this.allSeedConfigs = ImmutableMap.<SeedType, Map<String, JsonNode>>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@
dockerImageTag: 0.4.15
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
# this is our first example of setting these requirements. they are guesses, not data driven.
# setting default cpu higher, because we have found that check and discover can be cpu constrained for dbs.
default:
cpu_limit: "1.0"
cpu_request: "1.0"
memory_limit: "300Mi"
memory_request: "300Mi"
jobSpecific:
# sync jobs are generally IO and memory bound and not cpu.
- jobType: sync
resourceRequirements:
cpu_limit: "0.5"
cpu_request: "0.5"
memory_limit: "600Mi"
memory_request: "600Mi"
- name: MariaDB ColumnStore
destinationDefinitionId: 294a4790-429b-40ae-9516-49826b9702e1
dockerRepository: airbyte/destination-mariadb-columnstore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

public class AirbyteConfigValidator extends AbstractSchemaValidator<ConfigSchema> {

public static AirbyteConfigValidator AIRBYTE_CONFIG_VALIDATOR = new AirbyteConfigValidator();

@Override
public Path getSchemaPath(final ConfigSchema configType) {
return configType.getConfigSchemaFile().toPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
title: ActorDefinitionResourceRequirements
description: actor definition specific resource requirements
type: object
additionalProperties: true
# set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db.
additionalProperties: false
properties:
default:
description: if set, these are the requirements that should be set for ALL jobs run for this actor definition.
Expand All @@ -17,8 +18,10 @@ definitions:
JobTypeResourceLimit:
description: sets resource requirements for a specific job type for an actor definition. these values override the default, if both are set.
type: object
# set to false because we need the validations on seeds to be strict. otherwise, we will just add whatever is in the seed file into the db.
additionalProperties: false
required:
- jobtype
- jobType
- resourceRequirements
properties:
jobType:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: ResourceRequirements
description: generic configuration for pod source requirements
type: object
additionalProperties: true
additionalProperties: false
properties:
# todo (cgardens) - should be camel case for consistency.
cpu_request:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,77 @@

package io.airbyte.server.converters;

import io.airbyte.api.model.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.ConnectionRead;
import io.airbyte.api.model.ConnectionSchedule;
import io.airbyte.api.model.ConnectionStatus;
import io.airbyte.api.model.ConnectionUpdate;
import io.airbyte.api.model.JobType;
import io.airbyte.api.model.JobTypeResourceLimit;
import io.airbyte.api.model.ResourceRequirements;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.workers.helper.CatalogConverter;
import java.util.stream.Collectors;

public class ApiPojoConverters {

public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceRequirements) {
public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqsToInternal(final ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
}

return new io.airbyte.config.ActorDefinitionResourceRequirements()
.withDefault(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToInternal(actorDefResourceReqs.getDefault()))
.withJobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null
: actorDefResourceReqs.getJobSpecific()
.stream()
.map(jobSpecific -> new io.airbyte.config.JobTypeResourceLimit()
.withJobType(toInternalJobType(jobSpecific.getJobType()))
.withResourceRequirements(resourceRequirementsToInternal(jobSpecific.getResourceRequirements())))
.collect(Collectors.toList()));
}

public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
}

return new ActorDefinitionResourceRequirements()
._default(actorDefResourceReqs.getDefault() == null ? null : resourceRequirementsToApi(actorDefResourceReqs.getDefault()))
.jobSpecific(actorDefResourceReqs.getJobSpecific() == null ? null
: actorDefResourceReqs.getJobSpecific()
.stream()
.map(jobSpecific -> new JobTypeResourceLimit()
.jobType(toApiJobType(jobSpecific.getJobType()))
.resourceRequirements(resourceRequirementsToApi(jobSpecific.getResourceRequirements())))
.collect(Collectors.toList()));
}

public static io.airbyte.config.ResourceRequirements resourceRequirementsToInternal(final ResourceRequirements resourceReqs) {
if (resourceReqs == null) {
return null;
}

return new io.airbyte.config.ResourceRequirements()
.withCpuRequest(resourceRequirements.getCpuRequest())
.withCpuLimit(resourceRequirements.getCpuLimit())
.withMemoryRequest(resourceRequirements.getMemoryRequest())
.withMemoryLimit(resourceRequirements.getMemoryLimit());
.withCpuRequest(resourceReqs.getCpuRequest())
.withCpuLimit(resourceReqs.getCpuLimit())
.withMemoryRequest(resourceReqs.getMemoryRequest())
.withMemoryLimit(resourceReqs.getMemoryLimit());
}

public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceRequirements) {
public static ResourceRequirements resourceRequirementsToApi(final io.airbyte.config.ResourceRequirements resourceReqs) {
if (resourceReqs == null) {
return null;
}

return new ResourceRequirements()
.cpuRequest(resourceRequirements.getCpuRequest())
.cpuLimit(resourceRequirements.getCpuLimit())
.memoryRequest(resourceRequirements.getMemoryRequest())
.memoryLimit(resourceRequirements.getMemoryLimit());
.cpuRequest(resourceReqs.getCpuRequest())
.cpuLimit(resourceReqs.getCpuLimit())
.memoryRequest(resourceReqs.getMemoryRequest())
.memoryLimit(resourceReqs.getMemoryLimit());
}

public static io.airbyte.config.StandardSync connectionUpdateToInternal(final ConnectionUpdate update) {
Expand Down Expand Up @@ -90,6 +134,14 @@ public static ConnectionRead internalToConnectionRead(final StandardSync standar
return connectionRead;
}

public static JobType toApiJobType(final io.airbyte.config.JobTypeResourceLimit.JobType jobType) {
return Enums.convertTo(jobType, JobType.class);
}

public static io.airbyte.config.JobTypeResourceLimit.JobType toInternalJobType(final JobType jobType) {
return Enums.convertTo(jobType, io.airbyte.config.JobTypeResourceLimit.JobType.class);
}

public static ConnectionSchedule.TimeUnitEnum toApiTimeUnit(final Schedule.TimeUnit apiTimeUnit) {
return Enums.convertTo(apiTimeUnit, ConnectionSchedule.TimeUnitEnum.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import io.airbyte.api.model.ReleaseStage;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.services.AirbyteGithubStore;
Expand Down Expand Up @@ -77,7 +79,8 @@ static DestinationDefinitionRead buildDestinationDefinitionRead(final StandardDe
.documentationUrl(new URI(standardDestinationDefinition.getDocumentationUrl()))
.icon(loadIcon(standardDestinationDefinition.getIcon()))
.releaseStage(getReleaseStage(standardDestinationDefinition))
.releaseDate(getReleaseDate(standardDestinationDefinition));
.releaseDate(getReleaseDate(standardDestinationDefinition))
.resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardDestinationDefinition.getResourceRequirements()));
} catch (final URISyntaxException | NullPointerException e) {
throw new InternalServerKnownException("Unable to process retrieved latest destination definitions list", e);
}
Expand Down Expand Up @@ -127,23 +130,24 @@ public DestinationDefinitionRead getDestinationDefinition(final DestinationDefin
configRepository.getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId()));
}

public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefinitionCreate)
public DestinationDefinitionRead createCustomDestinationDefinition(final DestinationDefinitionCreate destinationDefCreate)
throws JsonValidationException, IOException {
final ConnectorSpecification spec = getSpecForImage(
destinationDefinitionCreate.getDockerRepository(),
destinationDefinitionCreate.getDockerImageTag());
destinationDefCreate.getDockerRepository(),
destinationDefCreate.getDockerImageTag());

final UUID id = uuidSupplier.get();
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDestinationDefinitionId(id)
.withDockerRepository(destinationDefinitionCreate.getDockerRepository())
.withDockerImageTag(destinationDefinitionCreate.getDockerImageTag())
.withDocumentationUrl(destinationDefinitionCreate.getDocumentationUrl().toString())
.withName(destinationDefinitionCreate.getName())
.withIcon(destinationDefinitionCreate.getIcon())
.withDockerRepository(destinationDefCreate.getDockerRepository())
.withDockerImageTag(destinationDefCreate.getDockerImageTag())
.withDocumentationUrl(destinationDefCreate.getDocumentationUrl().toString())
.withName(destinationDefCreate.getName())
.withIcon(destinationDefCreate.getIcon())
.withSpec(spec)
.withTombstone(false)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM);
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefCreate.getResourceRequirements()));

configRepository.writeStandardDestinationDefinition(destinationDefinition);

Expand All @@ -162,6 +166,9 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
final ConnectorSpecification spec = specNeedsUpdate
? getSpecForImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag())
: currentDestination.getSpec();
final ActorDefinitionResourceRequirements updatedResourceReqs = destinationDefinitionUpdate.getResourceRequirements() != null
? ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefinitionUpdate.getResourceRequirements())
: currentDestination.getResourceRequirements();

final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
Expand All @@ -173,7 +180,8 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
.withSpec(spec)
.withTombstone(currentDestination.getTombstone())
.withReleaseStage(currentDestination.getReleaseStage())
.withReleaseDate(currentDestination.getReleaseDate());
.withReleaseDate(currentDestination.getReleaseDate())
.withResourceRequirements(updatedResourceReqs);

configRepository.writeStandardDestinationDefinition(newDestination);
return buildDestinationDefinitionRead(newDestination);
Expand Down
Loading

0 comments on commit 8078302

Please sign in to comment.