Skip to content

Commit

Permalink
Exclude connectors with unsupported protocol version from seed updates (
Browse files Browse the repository at this point in the history
#19328)

* Filter out connectors with unsupported protocol in ApplyDefinitionsHelper

* Format

* Remove code dupl
  • Loading branch information
gosusnp authored Nov 15, 2022
1 parent 41f3c0a commit c47890d
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public BootloaderApp(final Configs configs,

postLoadExecution = () -> {
try {
final ApplyDefinitionsHelper applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get());
final ApplyDefinitionsHelper applyDefinitionsHelper =
new ApplyDefinitionsHelper(configRepository, this.definitionsProvider.get(), jobPersistence);
applyDefinitionsHelper.apply();

if (featureFlags.forceSecretMigration() || !jobPersistence.isSecretMigrated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ProtocolVersionChecker(final JobPersistence jobPersistence,
*/
public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoUpgrade) throws IOException {
final Optional<AirbyteVersion> currentAirbyteVersion = getCurrentAirbyteVersion();
final AirbyteProtocolVersionRange currentRange = getCurrentProtocolVersionRange();
final Optional<AirbyteProtocolVersionRange> currentRange = jobPersistence.getCurrentProtocolVersionRange();
final AirbyteProtocolVersionRange targetRange = getTargetProtocolVersionRange();

// Checking if there is a pre-existing version of airbyte.
Expand All @@ -73,13 +73,13 @@ public Optional<AirbyteProtocolVersionRange> validate(final boolean supportAutoU
return Optional.of(targetRange);
}

if (currentRange.equals(targetRange)) {
if (currentRange.isEmpty() || currentRange.get().equals(targetRange)) {
log.info("Using AirbyteProtocolVersion range [{}:{}]", targetRange.min().serialize(), targetRange.max().serialize());
return Optional.of(targetRange);
}

log.info("Detected an AirbyteProtocolVersion range change from [{}:{}] to [{}:{}]",
currentRange.min().serialize(), currentRange.max().serialize(),
currentRange.get().min().serialize(), currentRange.get().max().serialize(),
targetRange.min().serialize(), targetRange.max().serialize());

final Map<ActorType, Set<UUID>> conflicts = getConflictingActorDefinitions(targetRange);
Expand Down Expand Up @@ -123,22 +123,6 @@ protected Optional<AirbyteVersion> getCurrentAirbyteVersion() throws IOException
return jobPersistence.getVersion().map(AirbyteVersion::new);
}

protected AirbyteProtocolVersionRange getCurrentProtocolVersionRange() throws IOException {
Optional<Version> min = jobPersistence.getAirbyteProtocolVersionMin();
Optional<Version> max = jobPersistence.getAirbyteProtocolVersionMax();

if (min.isPresent() != max.isPresent()) {
// Flagging this because this would be highly suspicious but not bad enough that we should fail
// hard.
// If the new config is fine, the system should self-heal.
log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
}

return new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION));
}

protected AirbyteProtocolVersionRange getTargetProtocolVersionRange() {
return new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,6 @@ void testFirstInstallCheck(final boolean supportAutoUpgrade) throws IOException
assertEquals(Optional.of(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0)), protocolVersionChecker.validate(supportAutoUpgrade));
}

@Test
void testGetCurrentRange() throws IOException {
setCurrentProtocolRangeRange(V0_0_0, V1_0_0);

assertEquals(new AirbyteProtocolVersionRange(V0_0_0, V1_0_0), protocolVersionChecker.getCurrentProtocolVersionRange());
}

@Test
void testGetTargetRange() throws IOException {
setTargetProtocolRangeRange(V1_0_0, V2_0_0);
Expand Down Expand Up @@ -317,6 +310,7 @@ void testValidateSucceedsWhenNoProtocolRangeChangeWithoutDefinitionsProvider(fin
}

private void setCurrentProtocolRangeRange(final Version min, final Version max) throws IOException {
when(jobPersistence.getCurrentProtocolVersionRange()).thenReturn(Optional.of(new AirbyteProtocolVersionRange(min, max)));
when(jobPersistence.getAirbyteProtocolVersionMin()).thenReturn(Optional.of(min));
when(jobPersistence.getAirbyteProtocolVersionMax()).thenReturn(Optional.of(max));
}
Expand Down
1 change: 1 addition & 0 deletions airbyte-config/init/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {

implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-json-validation')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,43 @@

package io.airbyte.config.init;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

/**
* Helper class used to apply actor definitions from a DefinitionsProvider to the database. This is
* here to enable easy reuse of definition application logic in bootloader and cron.
*/
@Slf4j
public class ApplyDefinitionsHelper {

private final ConfigRepository configRepository;
private final DefinitionsProvider definitionsProvider;
private final JobPersistence jobPersistence;

// Remove once cloud has been migrated
@Deprecated(forRemoval = true)
public ApplyDefinitionsHelper(final ConfigRepository configRepository, final DefinitionsProvider definitionsProvider) {
this.configRepository = configRepository;
this.definitionsProvider = definitionsProvider;
this.jobPersistence = null;
}

public ApplyDefinitionsHelper(final ConfigRepository configRepository,
final DefinitionsProvider definitionsProvider,
final JobPersistence jobPersistence) {
this.configRepository = configRepository;
this.definitionsProvider = definitionsProvider;
this.jobPersistence = jobPersistence;
}

public void apply() throws JsonValidationException, IOException {
Expand All @@ -35,23 +53,70 @@ public void apply() throws JsonValidationException, IOException {
* @param updateAll - Whether we should overwrite all stored definitions
*/
public void apply(final boolean updateAll) throws JsonValidationException, IOException {
final Optional<AirbyteProtocolVersionRange> currentProtocolRange = getCurrentProtocolRange();

if (updateAll) {
final List<StandardSourceDefinition> latestSourceDefinitions = definitionsProvider.getSourceDefinitions();
for (final StandardSourceDefinition def : latestSourceDefinitions) {
for (final StandardSourceDefinition def : filterStandardSourceDefinitions(currentProtocolRange, latestSourceDefinitions)) {
configRepository.writeStandardSourceDefinition(def);
}

final List<StandardDestinationDefinition> latestDestinationDefinitions = definitionsProvider.getDestinationDefinitions();
for (final StandardDestinationDefinition def : latestDestinationDefinitions) {
for (final StandardDestinationDefinition def : filterStandardDestinationDefinitions(currentProtocolRange, latestDestinationDefinitions)) {
configRepository.writeStandardDestinationDefinition(def);
}
} else {
// todo (pedroslopez): Logic to apply definitions should be moved outside of the
// DatabaseConfigPersistence class and behavior standardized
configRepository.seedActorDefinitions(
definitionsProvider.getSourceDefinitions(),
definitionsProvider.getDestinationDefinitions());
filterStandardSourceDefinitions(currentProtocolRange, definitionsProvider.getSourceDefinitions()),
filterStandardDestinationDefinitions(currentProtocolRange, definitionsProvider.getDestinationDefinitions()));
}
}

private List<StandardDestinationDefinition> filterStandardDestinationDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
final List<StandardDestinationDefinition> destDefs) {
if (protocolVersionRange.isEmpty()) {
return destDefs;
}

return destDefs.stream().filter(def -> {
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
if (!isSupported) {
log.warn("Destination {} {} has an incompatible protocol version ({})... ignoring.",
def.getDestinationDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
}
return isSupported;
}).toList();
}

private List<StandardSourceDefinition> filterStandardSourceDefinitions(final Optional<AirbyteProtocolVersionRange> protocolVersionRange,
final List<StandardSourceDefinition> sourceDefs) {
if (protocolVersionRange.isEmpty()) {
return sourceDefs;
}

return sourceDefs.stream().filter(def -> {
final boolean isSupported = isProtocolVersionSupported(protocolVersionRange.get(), def.getSpec().getProtocolVersion());
if (!isSupported) {
log.warn("Source {} {} has an incompatible protocol version ({})... ignoring.",
def.getSourceDefinitionId(), def.getName(), def.getSpec().getProtocolVersion());
}
return isSupported;
}).toList();
}

private boolean isProtocolVersionSupported(final AirbyteProtocolVersionRange protocolVersionRange, final String protocolVersion) {
return protocolVersionRange.isSupported(AirbyteProtocolVersion.getWithDefault(protocolVersion));
}

private Optional<AirbyteProtocolVersionRange> getCurrentProtocolRange() throws IOException {
if (jobPersistence == null) {
// TODO Remove this once cloud has been migrated and job persistence is always defined
return Optional.empty();
}

return jobPersistence.getCurrentProtocolVersionRange();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class ApplyDefinitionsHelperTest {

Expand All @@ -29,41 +36,51 @@ class ApplyDefinitionsHelperTest {
private static final String DOCUMENTATION_URL = "https://wwww.example.com";
private static final String DOCKER_REPOSITORY = "airbyte/connector";
private static final String DOCKER_TAG = "0.1.0";
private static final String PROTOCOL_VERSION_1 = "1.0.0";
private static final String PROTOCOL_VERSION_2 = "2.0.0";
public static final StandardSourceDefinition SOURCE_DEF1 = new StandardSourceDefinition()
.withSourceDefinitionId(SOURCE_DEF_ID1)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME1)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));
public static final StandardSourceDefinition SOURCE_DEF2 = new StandardSourceDefinition()
.withSourceDefinitionId(SOURCE_DEF_ID1)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME2)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));

public static final StandardDestinationDefinition DEST_DEF1 = new StandardDestinationDefinition()
.withDestinationDefinitionId(DEST_DEF_ID2)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME1)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_2));

public static final StandardDestinationDefinition DEST_DEF2 = new StandardDestinationDefinition()
.withDestinationDefinitionId(DEST_DEF_ID2)
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_TAG)
.withName(CONNECT_NAME2)
.withDocumentationUrl(DOCUMENTATION_URL);
.withDocumentationUrl(DOCUMENTATION_URL)
.withSpec(new ConnectorSpecification().withProtocolVersion(PROTOCOL_VERSION_1));

private ConfigRepository configRepository;
private DefinitionsProvider definitionsProvider;
private JobPersistence jobPersistence;
private ApplyDefinitionsHelper applyDefinitionsHelper;

@BeforeEach
void setup() throws JsonValidationException, IOException {
configRepository = mock(ConfigRepository.class);
definitionsProvider = mock(DefinitionsProvider.class);
jobPersistence = mock(JobPersistence.class);

applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider);
applyDefinitionsHelper = new ApplyDefinitionsHelper(configRepository, definitionsProvider, jobPersistence);

// default calls to empty.
when(configRepository.listStandardDestinationDefinitions(true)).thenReturn(Collections.emptyList());
Expand Down Expand Up @@ -132,4 +149,24 @@ void testApplyOSS() throws JsonValidationException, IOException {
verifyNoMoreInteractions(definitionsProvider);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
void testDefinitionsFiltering(final boolean updateAll) throws JsonValidationException, IOException {
when(jobPersistence.getCurrentProtocolVersionRange())
.thenReturn(Optional.of(new AirbyteProtocolVersionRange(new Version("2.0.0"), new Version("3.0.0"))));

when(definitionsProvider.getSourceDefinitions()).thenReturn(List.of(SOURCE_DEF1, SOURCE_DEF2));
when(definitionsProvider.getDestinationDefinitions()).thenReturn(List.of(DEST_DEF1, DEST_DEF2));

applyDefinitionsHelper.apply(updateAll);

if (updateAll) {
verify(configRepository).writeStandardSourceDefinition(SOURCE_DEF2);
verify(configRepository).writeStandardDestinationDefinition(DEST_DEF1);
verifyNoMoreInteractions(configRepository);
} else {
verify(configRepository).seedActorDefinitions(List.of(SOURCE_DEF2), List.of(DEST_DEF1));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.commons.text.Names;
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
Expand Down Expand Up @@ -838,6 +839,27 @@ public void setAirbyteProtocolVersionMin(final Version version) throws IOExcepti
setMetadata(AirbyteProtocolVersion.AIRBYTE_PROTOCOL_VERSION_MIN_KEY_NAME, version.serialize());
}

@Override
public Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException {
final Optional<Version> min = getAirbyteProtocolVersionMin();
final Optional<Version> max = getAirbyteProtocolVersionMax();

if (min.isPresent() != max.isPresent()) {
// Flagging this because this would be highly suspicious but not bad enough that we should fail
// hard.
// If the new config is fine, the system should self-heal.
LOGGER.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
}

if (min.isEmpty() && max.isEmpty()) {
return Optional.empty();
}

return Optional.of(new AirbyteProtocolVersionRange(min.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION),
max.orElse(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION)));
}

private Stream<String> getMetadata(final String keyName) throws IOException {
return jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -262,6 +263,11 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setAirbyteProtocolVersionMin(Version version) throws IOException;

/**
* Get the current Airbyte Protocol Version range if defined
*/
Optional<AirbyteProtocolVersionRange> getCurrentProtocolVersionRange() throws IOException;

/**
* Returns a deployment UUID.
*/
Expand Down
Loading

0 comments on commit c47890d

Please sign in to comment.