Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#20703-source-salesforce-fix-prop…
Browse files Browse the repository at this point in the history
…s-chunk-length
  • Loading branch information
davydov-d committed Feb 15, 2023
2 parents 31782a3 + 420fdf5 commit 3c2ec42
Show file tree
Hide file tree
Showing 132 changed files with 1,586 additions and 352 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/connector_teams_review_requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ on:
paths:
- "airbyte-integrations/connectors/source-**"
pull_request_review:
paths:
- "airbyte-integrations/connectors/source-**"
jobs:
check-review-requirements:
name: "Check if a review is required from Connector teams"
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/deploy-docs-site.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:

pull_request:
types:
- closed
- opened
- reopened
- synchronize
Expand Down Expand Up @@ -42,7 +43,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |-
export SKIP_DEPLOY="yes"
if [ "${{github.event_name}}" = 'push' -o "${{github.event_name}}" = 'workflow_dispatch' ]; then
if [ "${{github.event_name}}" = 'push' -o "${{github.event_name}}" = 'workflow_dispatch' -o "${{github.event.pull_request.merged}}" = 'true']; then
export SKIP_DEPLOY="no"
fi
Expand Down
30 changes: 30 additions & 0 deletions airbyte-commons-converters/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id "java-library"
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation project(':airbyte-api')
implementation project(':airbyte-commons')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')
implementation libs.guava
implementation libs.slf4j.api

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor
testAnnotationProcessor libs.jmh.annotations

testImplementation libs.bundles.micronaut.test
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -34,12 +34,12 @@ public class CatalogClientConverters {
*/
public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(final io.airbyte.api.client.model.generated.AirbyteCatalog catalog) {

io.airbyte.protocol.models.AirbyteCatalog protoCatalog =
final io.airbyte.protocol.models.AirbyteCatalog protoCatalog =
new io.airbyte.protocol.models.AirbyteCatalog();
var airbyteStream = catalog.getStreams().stream().map(stream -> {
final var airbyteStream = catalog.getStreams().stream().map(stream -> {
try {
return toConfiguredProtocol(stream.getStream(), stream.getConfig());
} catch (JsonValidationException e) {
} catch (final JsonValidationException e) {
return null;
}
}).collect(Collectors.toList());
Expand All @@ -50,7 +50,7 @@ public static io.airbyte.protocol.models.AirbyteCatalog toAirbyteProtocol(final

@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream,
AirbyteStreamConfiguration config)
final AirbyteStreamConfiguration config)
throws JsonValidationException {
if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) {
// Validate the selected field paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import com.google.common.base.Preconditions;
import io.airbyte.commons.enums.Enums;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import com.google.common.hash.Hashing;
import io.airbyte.api.client.AirbyteApiClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import io.airbyte.api.model.generated.StreamDescriptor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

/**
* This class exists to track timing information for the sync. It needs to be thread-safe as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.helper;
package io.airbyte.commons.converters;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-commons-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ dependencies {

implementation project(':airbyte-analytics')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-converters')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-config:init')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.NormalizationDestinationDefinitionConfig;
import io.airbyte.api.model.generated.ResourceRequirements;
import io.airbyte.commons.converters.StateConverter;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.server.handlers.helpers.CatalogConverter;
import io.airbyte.config.BasicSchedule;
Expand All @@ -28,7 +29,6 @@
import io.airbyte.config.State;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.workers.helper.StateConverter;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import io.airbyte.api.model.generated.FieldSchemaUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.converters.ProtocolConverters;
import io.airbyte.commons.enums.Enums;
import io.airbyte.protocol.models.transform_models.FieldTransformType;
import io.airbyte.protocol.models.transform_models.StreamTransformType;
import io.airbyte.workers.helper.ProtocolConverters;
import java.util.List;
import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.api.model.generated.ResetConfig;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.converters.ProtocolConverters;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.server.scheduler.SynchronousJobMetadata;
import io.airbyte.commons.server.scheduler.SynchronousResponse;
Expand All @@ -44,7 +45,6 @@
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.workers.helper.ProtocolConverters;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.converters.ConnectionHelper;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
Expand Down Expand Up @@ -55,7 +56,6 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept
for (final AttemptRead a : jwar.getAttempts()) {
final var stat = stats.get(new JobAttemptPair(jwar.getJob().getId(), a.getId().intValue()));
if (stat == null) {
log.error("Missing stats for job {} attempt {}", jwar.getJob().getId(), a.getId().intValue());
log.warn("Missing stats for job {} attempt {}", jwar.getJob().getId(), a.getId().intValue());
continue;
}

Expand Down Expand Up @@ -197,7 +197,7 @@ public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody)
}

public JobOptionalRead getLastReplicationJob(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException {
Optional<Job> job = jobPersistence.getLastReplicationJob(connectionIdRequestBody.getConnectionId());
final Optional<Job> job = jobPersistence.getLastReplicationJob(connectionIdRequestBody.getConnectionId());
if (job.isEmpty()) {
return new JobOptionalRead();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateCreateOrUpdate;
import io.airbyte.commons.converters.StateConverter;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.workers.helper.StateConverter;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.api.model.generated.WebBackendOperationCreateOrUpdate;
import io.airbyte.api.model.generated.WebBackendWorkspaceState;
import io.airbyte.api.model.generated.WebBackendWorkspaceStateResult;
import io.airbyte.commons.converters.ProtocolConverters;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
Expand All @@ -61,7 +62,6 @@
import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ProtocolConverters;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.SyncMode;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.converters.ConnectionHelper;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
Expand Down Expand Up @@ -70,7 +71,6 @@
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ConnectionHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.StreamState;
import io.airbyte.commons.converters.ProtocolConverters;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
Expand All @@ -24,7 +25,6 @@
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.helper.ProtocolConverters;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down
1 change: 1 addition & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation libs.bundles.datadog

implementation project(':airbyte-api')
implementation project(':airbyte-commons-converters')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-config:config-models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand All @@ -26,7 +27,6 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.DiscoverCatalogResult;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.commons.converters.CatalogClientConverters;
import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
Expand All @@ -28,8 +30,6 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.CatalogClientConverters;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.converters.ConnectorConfigUpdater;
import io.airbyte.commons.converters.ThreadedTimeTracker;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
Expand All @@ -37,9 +39,7 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.ThreadedTimeTracker;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.AirbyteSource;
Expand Down Expand Up @@ -606,8 +606,8 @@ private List<FailureReason> getFailureReasons(final AtomicReference<FailureReaso
}

private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
Map<AirbyteStreamNameNamespacePair, Set<String>> streamToAllFields,
Map<AirbyteStreamNameNamespacePair, Set<String>> unexpectedFields,
final Map<AirbyteStreamNameNamespacePair, Set<String>> streamToAllFields,
final Map<AirbyteStreamNameNamespacePair, Set<String>> unexpectedFields,
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
if (message.getRecord() == null) {
Expand Down Expand Up @@ -639,10 +639,12 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
}
}

private static void populateUnexpectedFieldNames(AirbyteRecordMessage record, Set<String> fieldsInCatalog, Set<String> unexpectedFieldNames) {
private static void populateUnexpectedFieldNames(final AirbyteRecordMessage record,
final Set<String> fieldsInCatalog,
final Set<String> unexpectedFieldNames) {
final JsonNode data = record.getData();
if (data.isObject()) {
Iterator<String> fieldNamesInRecord = data.fieldNames();
final Iterator<String> fieldNamesInRecord = data.fieldNames();
while (fieldNamesInRecord.hasNext()) {
final String fieldName = fieldNamesInRecord.next();
if (!fieldsInCatalog.contains(fieldName)) {
Expand Down
Loading

0 comments on commit 3c2ec42

Please sign in to comment.