Skip to content

Commit

Permalink
Merge branch 'issue-19981_avoid-race-condition-on-airbyte-cdk-release…
Browse files Browse the repository at this point in the history
…' into issue-19981_notifications-on-publish-cdk-manually
  • Loading branch information
maxi297 committed Jan 10, 2023
2 parents 936794e + 090ca68 commit 55bccda
Show file tree
Hide file tree
Showing 576 changed files with 16,124 additions and 7,667 deletions.
5 changes: 2 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ DATABASE_PORT=5432
DATABASE_DB=airbyte
# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DB} (do not include the username or password here)
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.26.001

# Airbyte Internal Config Database, defaults to Job Database if empty. Explicitly left empty to mute docker compose warnings.
CONFIG_DATABASE_USER=
CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.23.002

### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
Expand Down Expand Up @@ -115,4 +115,3 @@ OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
AUTO_DETECT_SCHEMA=false

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
* This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all
* open API generators create a separate class per API "root-route". For example, if our API has two
* routes "/v1/First/get" and "/v1/Second/get", OpenAPI generates (essentially) the following files:
*
* <p>
* ApiClient.java, FirstApi.java, SecondApi.java
*
* <p>
* To call the API type-safely, we'd do new FirstApi(new ApiClient()).get() or new SecondApi(new
* ApiClient()).get(), which can get cumbersome if we're interacting with many pieces of the API.
*
* <p>
* This is currently manually maintained. We could look into autogenerating it if needed.
*/
public class AirbyteApiClient {
Expand Down
8 changes: 6 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4730,9 +4730,13 @@ components:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
sourceId:
$ref: "#/components/schemas/SourceId"
type: array
items:
$ref: "#/components/schemas/SourceId"
destinationId:
$ref: "#/components/schemas/DestinationId"
type: array
items:
$ref: "#/components/schemas/DestinationId"
WebBackendConnectionListItem:
type: object
description: Information about a connection that shows up in the connection list view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
"""
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - looback_window and the stream_state's datetime
The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime
The end of the window is the minimum datetime between the start of the window and end_datetime.
:param sync_mode:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def test_validation_type_missing_required_fields():
min_datetime: "{{ config['start_time'] + day_delta(2) }}"
end_datetime: "{{ config['end_time'] }}"
cursor_field: "created"
lookback_window: "5d"
lookback_window: "P5D"
start_time_option:
inject_into: request_parameter
field_name: created[gte]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
Expand All @@ -30,7 +29,6 @@
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.StreamDescriptor;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
Expand Down Expand Up @@ -62,7 +60,6 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class TemporalClient {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private boolean cancelled = false;
private boolean failed = false;
@Deprecated
@Getter(AccessLevel.NONE)
private final boolean resetConnection = false;
@Deprecated
@Getter(AccessLevel.NONE)
private final boolean continueAsReset = false;
@Deprecated
@Getter(AccessLevel.NONE)
private boolean quarantined = false;
private boolean success = true;
private boolean cancelledForReset = false;
@Deprecated
@Getter(AccessLevel.NONE)
private final boolean resetWithScheduling = false;
private boolean doneWaiting = false;
private boolean skipSchedulingNextWorkflow = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class ApiClientBeanFactory {
private static final int JWT_TTL_MINUTES = 5;

@Singleton
public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Named("apiClient")
public ApiClient apiClient(
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new io.airbyte.api.client.invoker.generated.ApiClient()
return new ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
Expand All @@ -66,7 +68,7 @@ public AirbyteApiClient airbyteApiClient(final ApiClient apiClient) {
}

@Singleton
public SourceApi sourceApi(final ApiClient apiClient) {
public SourceApi sourceApi(@Named("apiClient") final ApiClient apiClient) {
return new SourceApi(apiClient);
}

Expand All @@ -87,7 +89,7 @@ public WorkspaceApi workspaceApi(final ApiClient apiClient) {

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
return HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
Expand All @@ -18,7 +17,6 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,7 +26,6 @@
// todo (cgardens) - we are not getting any value out of instantiating this class. we should just
// use it as statics. not doing it now, because already in the middle of another refactor.
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionHelper {

private final ConfigRepository configRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package io.airbyte.commons.features;

import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class EnvVariableFeatureFlags implements FeatureFlags {

private static final Logger log = LoggerFactory.getLogger(EnvVariableFeatureFlags.class);

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
// Set this value to true to see all messages from the source to destination, set to one second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public File getServerLogFile(final Path workspaceRoot, final WorkerEnvironment w
}
final var cloudLogPath = sanitisePath(APP_LOGGING_CLOUD_PREFIX, getServerLogsRoot(workspaceRoot));
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -95,6 +96,7 @@ public File getSchedulerLogFile(final Path workspaceRoot, final WorkerEnvironmen

final var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + getSchedulerLogsRoot(workspaceRoot);
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -111,6 +113,7 @@ public List<String> getJobLogFile(final WorkerEnvironment workerEnvironment, fin
}

final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, logPath);
createCloudClientIfNull(logConfigs);
return logClient.tailCloudLog(logConfigs, cloudLogPath, LOG_TAIL_SIZE);
}

Expand All @@ -127,6 +130,7 @@ public void deleteLogs(final WorkerEnvironment workerEnvironment, final LogConfi
throw new NotImplementedException("Local log deletes not supported.");
}
final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, Path.of(logPath));
createCloudClientIfNull(logConfigs);
logClient.deleteLogs(logConfigs, cloudLogPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
"OptionalUsedAsFieldOrParameterType"})
public class ConfigRepository {

public record StandardSyncQuery(@Nonnull UUID workspaceId, UUID sourceId, UUID destinationId, boolean includeDeleted) {}
public record StandardSyncQuery(@Nonnull UUID workspaceId, List<UUID> sourceId, List<UUID> destinationId, boolean includeDeleted) {}

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
private static final String OPERATION_IDS_AGG_FIELD = "operation_ids_agg";
Expand Down Expand Up @@ -126,7 +126,7 @@ public ConfigRepository(final Database database) {
*/
public boolean healthCheck() {
try {
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch());
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch()).stream().count();
} catch (final Exception e) {
LOGGER.error("Health check error: ", e);
return false;
Expand Down Expand Up @@ -294,7 +294,8 @@ private Stream<StandardSourceDefinition> sourceDefQuery(final Optional<UUID> sou
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.source))
.and(sourceDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardSourceDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -356,7 +357,8 @@ private Stream<StandardDestinationDefinition> destDefQuery(final Optional<UUID>
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.destination))
.and(destDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardDestinationDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -865,8 +867,10 @@ public List<StandardSync> listWorkspaceStandardSyncs(final StandardSyncQuery sta
// join with source actors so that we can filter by workspaceId
.join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID))
.where(ACTOR.WORKSPACE_ID.eq(standardSyncQuery.workspaceId)
.and(standardSyncQuery.destinationId == null ? noCondition() : CONNECTION.DESTINATION_ID.eq(standardSyncQuery.destinationId))
.and(standardSyncQuery.sourceId == null ? noCondition() : CONNECTION.SOURCE_ID.eq(standardSyncQuery.sourceId))
.and(standardSyncQuery.destinationId == null || standardSyncQuery.destinationId.isEmpty() ? noCondition()
: CONNECTION.DESTINATION_ID.in(standardSyncQuery.destinationId))
.and(standardSyncQuery.sourceId == null || standardSyncQuery.sourceId.isEmpty() ? noCondition()
: CONNECTION.SOURCE_ID.in(standardSyncQuery.sourceId))
.and(standardSyncQuery.includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))

// group by connection.id so that the groupConcat above works
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static Stream<Record4<UUID, String, ActorType, String>> getActorDefiniti
return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION)
.from(ACTOR_DEFINITION)
.join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID))
.fetchStream();
.fetch()
.stream();
}

static void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs, final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private Stream<StandardSyncIdsWithProtocolVersions> findDisabledSyncs(final DSLC
.where(
CONNECTION.UNSUPPORTED_PROTOCOL_VERSION.eq(true).and(
(actorType == ActorType.DESTINATION ? destDef : sourceDef).ID.eq(actorDefId)))
.fetchStream()
.fetch()
.stream()
.map(r -> new StandardSyncIdsWithProtocolVersions(
r.get(CONNECTION.ID),
r.get(sourceDef.ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ void testListWorkspaceStandardSyncAll() throws IOException {
@Test
void testListWorkspaceStandardSyncWithAllFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_1, MockData.DESTINATION_ID_1, false);
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, List.of(MockData.SOURCE_ID_1), List.of(MockData.DESTINATION_ID_1), false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
.filter(sync -> query.destinationId().contains(sync.getDestinationId()))
.filter(sync -> query.sourceId().contains(sync.getSourceId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

Expand All @@ -211,9 +211,9 @@ void testListWorkspaceStandardSyncWithAllFiltering() throws IOException {
@Test
void testListWorkspaceStandardSyncDestinationFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, null, MockData.DESTINATION_ID_1, false);
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, null, List.of(MockData.DESTINATION_ID_1), false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
.filter(sync -> query.destinationId().contains(sync.getDestinationId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

Expand All @@ -223,9 +223,9 @@ void testListWorkspaceStandardSyncDestinationFiltering() throws IOException {
@Test
void testListWorkspaceStandardSyncSourceFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_2, null, false);
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, List.of(MockData.SOURCE_ID_2), null, false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
.filter(sync -> query.sourceId().contains(sync.getSourceId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(fina
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
.from(CONNECTION)
.where(CONNECTION.ID.in(standardSync.stream().map(StandardSync::getConnectionId).toList()))
.fetchStream())
.fetch())
.stream()
.map(r -> new StandardSyncProtocolVersionFlag(r.get(CONNECTION.ID), r.get(CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)))
.collect(Collectors.toSet());
}
Expand Down
49 changes: 49 additions & 0 deletions airbyte-config/init/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,52 @@ tasks.named("buildDockerImage") {
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)

/**
* Simple task that checks if all icons in the seed definition files exist as well as
* that no icon in the icons folder is unused.
*/
class IconValidationTask extends DefaultTask {

private slurper = new groovy.yaml.YamlSlurper()

@InputFile
File sourceDefinitions = project.file("src/main/resources/seed/source_definitions.yaml")

@InputFile
File destinationDefinitions = project.file("src/main/resources/seed/destination_definitions.yaml")

@InputDirectory
File iconDirectory = project.file("src/main/resources/icons")

private String[] getIconsFromYaml(File file) {
def yaml = this.slurper.parse(file.newReader())
return yaml.collect { it.icon }.findAll { it != null }
}

@TaskAction
void validateIcons() {
def sourceIcons = this.getIconsFromYaml(this.sourceDefinitions)
def destinationIcons = this.getIconsFromYaml(this.destinationDefinitions)
def icons = sourceIcons.plus(destinationIcons) as Collection<String>

def iconFiles = project.fileTree(this.iconDirectory).collect { it.name }
def nonExistingIconFiles = icons - iconFiles
def orphanedFiles = iconFiles - icons.intersect(iconFiles)

def errors = []
if (!nonExistingIconFiles.isEmpty()) {
errors.push("The following icon files have been referenced inside the seed files, but don't exist:\n\n${nonExistingIconFiles.join('\n')}")
}
if (!orphanedFiles.isEmpty()) {
errors.push("The following icons are not used in the seed files and should be removed:\n\n${orphanedFiles.join('\n')}")
}

if (!errors.isEmpty()) {
throw new Error(errors.join('\n\n'))
}
}
}

task validateIcons(type: IconValidationTask)
check.dependsOn validateIcons
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 55bccda

Please sign in to comment.