Skip to content

Commit

Permalink
Merge branch 'master' into leeti/add-clear-cta-to-trial-expired-message
Browse files Browse the repository at this point in the history
* master: (24 commits)
  Discover worker starts to use API to write schema result (#21875)
  🪟 🎉  Connector Builder Landing Page (#22122)
  Fix pnpm cache path (#22418)
  Add additional shorter setup guides (#22318)
  Source Amazon Ads: fix reports stream records primary keys (#21677)
  Connector acceptance test: Fix discovered catalog caching for different configs (#22301)
  🪟🐛 Make modal scrollable (#21973)
  only compute diff if the schema discovery actually succeeded (#22377)
  Source Klaviyo: fix schema (#22071)
  🪟 🔧 Switch to `pnpm` for package managing (#22053)
  Source Sentry: turn on default availability strategy (#22303)
  Source freshdesk: deduplicate table names (#22164)
  Update connector-acceptance-tests-reference.md (#22370)
  Update the default security groups for the EC2 runner (#22347)
  Trace refresh schema operations (#22326)
  Remove manual docker upgrades from workflows (#22344)
  Update CODEOWNERS for connector acceptance tests to connector ops (#22341)
  🐛 source: airtable - handle singleSelect types (#22311)
  Source tiktok: chunk advertiser IDs (#22309)
  🪟 🧪 E2E Tests for auto-detect schema changes (#20682)
  ...
  • Loading branch information
letiescanciano committed Feb 6, 2023
2 parents 2c63876 + 36698ce commit fc9d567
Show file tree
Hide file tree
Showing 2,002 changed files with 21,423 additions and 61,314 deletions.
4 changes: 2 additions & 2 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# CDK and SAT
# CDK and Connector Acceptance Tests
/airbyte-cdk/ @airbytehq/connector-extensibility
/airbyte-integrations/bases/source-acceptance-tests/ @airbytehq/connector-extensibility
/airbyte-integrations/connector-templates/ @airbytehq/connector-extensibility
/airbyte-integrations/bases/connector-acceptance-tests/ @airbytehq/connector-operations

# Oauth
/airbyte-oauth/ @airbytehq/connector-operations
Expand Down
10 changes: 0 additions & 10 deletions .github/actions/build-and-push-branch/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ inputs:
runs:
using: "composite"
steps:
- name: Delete default old docker and replace it with a new one
shell: bash
run: |
sudo rm /var/lib/dpkg/lock
sudo rm /var/lib/dpkg/lock-frontend
sudo apt-get remove docker.io || sudo apt-get remove docker
curl -fsSL https://get.docker.com | bash -
sudo rm -f /var/lib/dpkg/lock
sudo rm -f /var/lib/dpkg/lock-frontend
- name: Build
id: build
uses: ./.github/actions/build-branch
Expand Down
9 changes: 9 additions & 0 deletions .github/actions/cache-build-artifacts/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ runs:
restore-keys: |
${{ inputs.cache-key }}-npm-${{ runner.os }}-
- name: pnpm Caching
uses: actions/cache@v3
with:
path: |
~/.local/share/pnpm/store
key: ${{ inputs.cache-key }}-pnpm-${{ runner.os }}-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ inputs.cache-key }}-pnpm-${{ runner.os }}-
# this intentionally does not use restore-keys so we don't mess with gradle caching
- name: Gradle and Python Caching
uses: actions/cache@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/start-aws-runner/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ inputs:
default: "subnet-0469a9e68a379c1d3"
required: true
security-group-id:
default: "sg-0793f3c9413f21970"
default: "sg-0793f3c9413f21970,sg-023656a050e7d5634"
required: true
label:
required: false
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connector_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
python -m pip install --upgrade pip
pip install PyYAML requests
- name: Launch Integration Tests
run: python ./tools/bin/ci_integration_workflow_launcher.py base-normalization source-acceptance-test source:beta source:GA destination:beta destination:GA
run: python ./tools/bin/ci_integration_workflow_launcher.py base-normalization connector-acceptance-test source:beta source:GA destination:beta destination:GA
env:
GITHUB_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
launch_integration_tests_alpha_only:
Expand Down
20 changes: 0 additions & 20 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,16 +528,6 @@ jobs:
with:
python-version: "3.9"

- name: Delete default old docker and replace it with a new one
shell: bash
run: |
sudo rm /var/lib/dpkg/lock
sudo rm /var/lib/dpkg/lock-frontend
sudo apt-get remove docker.io || sudo apt-get remove docker
curl -fsSL https://get.docker.com | bash -
sudo rm -f /var/lib/dpkg/lock
sudo rm -f /var/lib/dpkg/lock-frontend
- name: Set up CI Gradle Properties
run: |
mkdir -p ~/.gradle/
Expand Down Expand Up @@ -655,16 +645,6 @@ jobs:
with:
node-version: "lts/*"

- name: Delete default old docker and replace it with a new one
shell: bash
run: |
sudo rm /var/lib/dpkg/lock
sudo rm /var/lib/dpkg/lock-frontend
sudo apt-get remove docker.io || sudo apt-get remove docker
curl -fsSL https://get.docker.com | bash -
sudo rm -f /var/lib/dpkg/lock
sudo rm -f /var/lib/dpkg/lock-frontend
- name: Set up CI Gradle Properties
run: |
mkdir -p ~/.gradle/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ definitions:
- stream_slicers
properties:
type:
type: string
enum: [CartesianProductStreamSlicer]
stream_slicers:
type: array
Expand Down Expand Up @@ -746,6 +747,7 @@ definitions:
- type
properties:
type:
type: string
enum: [JsonFileSchemaLoader, JsonSchema] # TODO As part of Beta, remove JsonSchema and update connectors to use JsonFileSchemaLoader
file_path:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
config - Dict object that has been loaded from config file.
:return tuple of user defined config dict with filtered out internal
parameters and SAT internal config object.
parameters and connector acceptance test internal config object.
"""
main_config = {}
internal_config = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
isCustomConnector);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId, sourceDef);

if (discoverSchemaRequestBody.getConnectionId() != null) {
if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) {
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,40 @@ void testDiscoverSchemaForSourceMultipleConnectionsFeatureFlagOn() throws IOExce
assertEquals(ConnectionStatus.INACTIVE, connectionUpdateValues.get(2).getStatus());
}

@Test
void testDiscoverSchemaFromSourceIdWithConnectionUpdateNonSuccessResponse() throws IOException, JsonValidationException, ConfigNotFoundException {
final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
final SourceDiscoverSchemaRequestBody request = new SourceDiscoverSchemaRequestBody().sourceId(source.getSourceId())
.connectionId(UUID.randomUUID());

// Mock the source definition.
when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId()))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(SOURCE_DOCKER_TAG)
.withProtocolVersion(SOURCE_PROTOCOL_VERSION)
.withSourceDefinitionId(source.getSourceDefinitionId()));
// Mock the source itself.
when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source);
// Mock the Discover job results.
final SynchronousResponse<UUID> discoverResponse = (SynchronousResponse<UUID>) jobResponse;
final SynchronousJobMetadata metadata = mock(SynchronousJobMetadata.class);
when(discoverResponse.isSuccess()).thenReturn(false);
when(discoverResponse.getMetadata()).thenReturn(metadata);
when(metadata.isSucceeded()).thenReturn(false);
when(synchronousSchedulerClient.createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
false))
.thenReturn(discoverResponse);

final SourceDiscoverSchemaRead actual = schedulerHandler.discoverSchemaForSourceFromSourceId(request);

assertNull(actual.getCatalog());
assertNotNull(actual.getJobInfo());
assertFalse(actual.getJobInfo().getSucceeded());
verify(synchronousSchedulerClient).createDiscoverSchemaJob(source, SOURCE_DOCKER_IMAGE, SOURCE_DOCKER_TAG, new Version(SOURCE_PROTOCOL_VERSION),
false);
}

@Test
void testDiscoverSchemaForSourceFromSourceCreate() throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceConnection source = new SourceConnection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.security.interfaces.RSAPrivateKey;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,6 +54,8 @@ public ApiClient apiClient(
.setPort(parsePort(airbyteApiHost))
.setBasePath("/api")
.setHttpClientBuilder(HttpClient.newBuilder().version(Version.HTTP_1_1))
.setConnectTimeout(Duration.ofSeconds(30))
.setReadTimeout(Duration.ofSeconds(30))
.setRequestInterceptor(builder -> {
builder.setHeader("User-Agent", "WorkerApp");
// internalApiAuthToken is in BeanProvider because we want to create a new token each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
Expand All @@ -26,6 +28,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
Expand All @@ -43,29 +46,28 @@
public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDiscoverCatalogWorker.class);

private final ConfigRepository configRepository;
private static final String WRITE_DISCOVER_CATALOG_LOGS_TAG = "call to write discover schema result";

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
private final ConnectorConfigUpdater connectorConfigUpdater;

private final AirbyteApiClient airbyteApiClient;
private volatile Process process;

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater,
final AirbyteStreamFactory streamFactory) {
this.configRepository = configRepository;
this.airbyteApiClient = airbyteApiClient;
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.connectorConfigUpdater = connectorConfigUpdater;
}

public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public DefaultDiscoverCatalogWorker(final AirbyteApiClient airbyteApiClient,
final IntegrationLauncher integrationLauncher,
final ConnectorConfigUpdater connectorConfigUpdater) {
this(configRepository, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
this(airbyteApiClient, integrationLauncher, connectorConfigUpdater, new DefaultAirbyteStreamFactory());
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -108,14 +110,11 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}

if (catalog.isPresent()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(),
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
discoverSchemaInput.getConnectorVersion(),
discoverSchemaInput.getConfigHash());
jobOutput.setDiscoverCatalogId(catalogId);
final DiscoverCatalogResult result =
AirbyteApiClient.retryWithJitter(() -> airbyteApiClient.getSourceApi()
.writeDiscoverCatalogResult(buildSourceDiscoverSchemaWriteRequestBody(discoverSchemaInput, catalog.get())),
WRITE_DISCOVER_CATALOG_LOGS_TAG);
jobOutput.setDiscoverCatalogId(result.getCatalogId());
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
}
Expand All @@ -129,6 +128,19 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
}
}

private SourceDiscoverSchemaWriteRequestBody buildSourceDiscoverSchemaWriteRequestBody(final StandardDiscoverCatalogInput discoverSchemaInput,
final AirbyteCatalog catalog) {
return new SourceDiscoverSchemaWriteRequestBody().catalog(
CatalogClientConverters.toAirbyteCatalogClientApi(catalog)).sourceId(
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()))
.connectorVersion(
discoverSchemaInput.getConnectorVersion())
.configurationHash(
discoverSchemaInput.getConfigHash());
}

private Map<String, Object> generateTraceTags(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) {
final Map<String, Object> tags = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.protocol.models.AirbyteStream;
import java.util.stream.Collectors;

/**
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing
* logic in CatalogConverter.java; But code can't be shared because the protocol model is
* essentially converted to two different api models. Thus, if we need to change logic on either
* place we have to take care of the other one too.
*/
public class CatalogClientConverters {

/**
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog.
*/
public static io.airbyte.api.client.model.generated.AirbyteCatalog toAirbyteCatalogClientApi(
final io.airbyte.protocol.models.AirbyteCatalog catalog) {
return new io.airbyte.api.client.model.generated.AirbyteCatalog()
.streams(catalog.getStreams()
.stream()
.map(stream -> toAirbyteStreamClientApi(stream))
.map(s -> new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration()
.stream(s)
.config(generateDefaultConfiguration(s)))
.collect(Collectors.toList()));
}

private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
if (stream.getSupportedSyncModes().size() > 0) {
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0),
io.airbyte.api.client.model.generated.SyncMode.class));
} else {
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL);
}
return result;
}

private static io.airbyte.api.client.model.generated.AirbyteStream toAirbyteStreamClientApi(
final AirbyteStream stream) {
return new io.airbyte.api.client.model.generated.AirbyteStream()
.name(stream.getName())
.jsonSchema(stream.getJsonSchema())
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(),
io.airbyte.api.client.model.generated.SyncMode.class))
.sourceDefinedCursor(stream.getSourceDefinedCursor())
.defaultCursorField(stream.getDefaultCursorField())
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey())
.namespace(stream.getNamespace());
}

}
Loading

0 comments on commit fc9d567

Please sign in to comment.