diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index 5093fab4cf29..74a87145101c 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopNameMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test class MockBasicFunctionalityIntegrationTest : @@ -26,6 +27,7 @@ class MockBasicFunctionalityIntegrationTest : } @Test + @Disabled override fun testMidSyncCheckpointingStreamState() { super.testMidSyncCheckpointingStreamState() } diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 48f390a35cc4..04a717ff3a1e 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.48.0 | 2024-10-23 | [\#46302](https://github.com/airbytehq/airbyte/pull/46302) | Add support for file transfer | | 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest| | 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty| | 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas| diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt index 31f80de484d5..60ab054a2fb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt @@ -63,7 +63,7 @@ class BufferDequeue( // otherwise pull records until we hit the memory limit. val newSize: Long = (memoryItem.size) + bytesRead.get() - if (newSize <= optimalBytesToRead) { + if (newSize <= optimalBytesToRead || output.isEmpty()) { memoryItem.size.let { bytesRead.addAndGet(it) } queue.poll()?.item?.let { output.add(it) } } else { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt new file mode 100644 index 000000000000..54df03f239d0 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/AirbyteRecordMessageFile.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.async.model + +import com.fasterxml.jackson.annotation.JsonProperty + +class AirbyteRecordMessageFile { + constructor( + fileUrl: String? = null, + bytes: Long? = null, + fileRelativePath: String? = null, + modified: Long? = null, + sourceFileUrl: String? = null + ) { + this.fileUrl = fileUrl + this.bytes = bytes + this.fileRelativePath = fileRelativePath + this.modified = modified + this.sourceFileUrl = sourceFileUrl + } + constructor() : + this( + fileUrl = null, + bytes = null, + fileRelativePath = null, + modified = null, + sourceFileUrl = null + ) + + @get:JsonProperty("file_url") + @set:JsonProperty("file_url") + @JsonProperty("file_url") + var fileUrl: String? = null + + @get:JsonProperty("bytes") + @set:JsonProperty("bytes") + @JsonProperty("bytes") + var bytes: Long? = null + + @get:JsonProperty("file_relative_path") + @set:JsonProperty("file_relative_path") + @JsonProperty("file_relative_path") + var fileRelativePath: String? = null + + @get:JsonProperty("modified") + @set:JsonProperty("modified") + @JsonProperty("modified") + var modified: Long? = null + + @get:JsonProperty("source_file_url") + @set:JsonProperty("source_file_url") + @JsonProperty("source_file_url") + var sourceFileUrl: String? = null +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt index fd26f6ad5747..d01c70c15cf6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteRecordMessage.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async.model import com.fasterxml.jackson.annotation.JsonProperty -import com.fasterxml.jackson.annotation.JsonPropertyDescription import com.fasterxml.jackson.databind.JsonNode import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.StreamDescriptor @@ -33,7 +32,6 @@ class PartialAirbyteRecordMessage { @get:JsonProperty("emitted_at") @set:JsonProperty("emitted_at") @JsonProperty("emitted_at") - @JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.") var emittedAt: Long = 0 @get:JsonProperty("meta") @@ -41,6 +39,11 @@ class PartialAirbyteRecordMessage { @JsonProperty("meta") var meta: AirbyteRecordMessageMeta? = null + @get:JsonProperty("file") + @set:JsonProperty("file") + @JsonProperty("file") + var file: AirbyteRecordMessageFile? = null + fun withNamespace(namespace: String?): PartialAirbyteRecordMessage { this.namespace = namespace return this @@ -66,6 +69,11 @@ class PartialAirbyteRecordMessage { return this } + fun withFile(file: AirbyteRecordMessageFile): PartialAirbyteRecordMessage { + this.file = file + return this + } + override fun equals(other: Any?): Boolean { if (this === other) { return true @@ -77,7 +85,8 @@ class PartialAirbyteRecordMessage { return namespace == that.namespace && stream == that.stream && emittedAt == that.emittedAt && - meta == that.meta + meta == that.meta && + file == that.file } override fun hashCode(): Int { @@ -98,6 +107,9 @@ class PartialAirbyteRecordMessage { ", meta='" + meta + '\'' + + ", file='" + + file + + '\'' + '}' } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 5312981f0d4d..bd2f8f35bab4 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.47.3 +version=0.48.0 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index e8634f5f8707..bd706681ed5a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -765,6 +765,9 @@ class AsyncStreamConsumerTest { val throwable = assertThrows(RuntimeException::class.java) { consumer.accept(retyped, retyped.length) } // Ensure that the offending data has been scrubbed from the error message - assertFalse(throwable.message!!.contains(offender)) + assertFalse( + throwable.message!!.contains(offender), + "message should not contain the offender. Was ${throwable.message}" + ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt index 6fdd1d4836bf..5be46433d1a8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt @@ -14,6 +14,7 @@ import io.airbyte.configoss.WorkerDestinationConfig import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteStateStats import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog +import io.airbyte.workers.exception.TestHarnessException import io.airbyte.workers.helper.ConnectorConfigUpdater import io.airbyte.workers.internal.AirbyteDestination import io.airbyte.workers.internal.DefaultAirbyteDestination @@ -215,7 +216,11 @@ abstract class BaseDestinationAcceptanceTest( } } - destination.close() + try { + destination.close() + } catch (e: TestHarnessException) { + throw TestHarnessException(e.message, e, destinationOutput) + } return destinationOutput } @@ -258,6 +263,7 @@ abstract class BaseDestinationAcceptanceTest( workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource, "host", getConnectorEnv() ) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 4761398a496c..a71aca115d2e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", envMap ) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt index 5106ad19f598..ee70fe2c54d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt @@ -4,6 +4,7 @@ package io.airbyte.commons.features import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.util.function.Function private val log = KotlinLogging.logger {} @@ -46,6 +47,16 @@ class EnvVariableFeatureFlags : FeatureFlags { return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg } } + override fun airbyteStagingDirectory(): Path? { + return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, null) { arg: String -> + Path.of(arg) + } + } + + override fun useFileTransfer(): Boolean { + return getEnvOrDefault(USE_FILE_TRANSFER, false) { it.toBoolean() } + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java fun getEnvOrDefault(key: String?, defaultValue: T, parser: Function): T { val value = System.getenv(key) @@ -73,5 +84,8 @@ class EnvVariableFeatureFlags : FeatureFlags { const val STRICT_COMPARISON_NORMALIZATION_TAG: String = "STRICT_COMPARISON_NORMALIZATION_TAG" const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE" + val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files") + const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY" + const val USE_FILE_TRANSFER = "USE_FILE_TRANSFER" } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt index a8626b46ec64..e05e9608d8f4 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + /** * Interface that describe which features are activated in airbyte. Currently, the only * implementation relies on env. Ideally it should be on some DB. @@ -51,4 +53,8 @@ interface FeatureFlags { * @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode. */ fun deploymentMode(): String? + + fun airbyteStagingDirectory(): Path? + + fun useFileTransfer(): Boolean } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index 056c6730332c..abb16dbe463e 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags { override fun autoDetectSchema(): Boolean { return wrapped.autoDetectSchema() @@ -36,6 +38,14 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags return wrapped.deploymentMode() } + override fun airbyteStagingDirectory(): Path? { + return wrapped.airbyteStagingDirectory() + } + + override fun useFileTransfer(): Boolean { + return wrapped.useFileTransfer() + } + companion object { /** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */ @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt index 15adcfdd2a06..b7f1edd1c0a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/exception/TestHarnessException.kt @@ -3,8 +3,23 @@ */ package io.airbyte.workers.exception +import io.airbyte.protocol.models.v0.AirbyteMessage + class TestHarnessException : Exception { - constructor(message: String?) : super(message) + val outputMessages: List? + constructor(message: String?) : super(message) { + outputMessages = null + } + + constructor(message: String?, cause: Throwable?) : super(message, cause) { + outputMessages = null + } - constructor(message: String?, cause: Throwable?) : super(message, cause) + constructor( + message: String?, + cause: Throwable?, + outputMessages: List + ) : super(message, cause) { + this.outputMessages = outputMessages + } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt index e58319655d9d..03d7a9b8c138 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.base.Joiner import com.google.common.base.Strings import com.google.common.collect.Lists +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.map.MoreMaps @@ -30,6 +31,7 @@ class DockerProcessFactory( private val workspaceRoot: Path, private val workspaceMountSource: String?, private val localMountSource: String?, + private val fileTransferMountSource: Path?, private val networkName: String?, private val envMap: Map ) : ProcessFactory { @@ -125,6 +127,20 @@ class DockerProcessFactory( cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION)) } + if (fileTransferMountSource != null) { + cmd.add("-v") + cmd.add( + "$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}" + ) + cmd.add("-e") + cmd.add( + "${EnvVariableFeatureFlags.AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME}=${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}" + ) + + cmd.add("-e") + cmd.add("${EnvVariableFeatureFlags.USE_FILE_TRANSFER}=true") + } + val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables) for ((key, value) in allEnvMap) { cmd.add("-e") diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt index 84496b9951ec..cf4bac36654d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt @@ -79,7 +79,8 @@ protected constructor( s3Config, catalog, memoryRatio, - nThreads + nThreads, + featureFlags ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index fdf25cf0c6f7..a16a2a6baa7b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -10,6 +10,8 @@ import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager +import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction @@ -21,12 +23,17 @@ import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy import io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory.Companion.getCreateFunction import io.airbyte.commons.exceptions.ConfigErrorException +import io.airbyte.commons.features.FeatureFlags import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.* import io.github.oshai.kotlinlogging.KotlinLogging +import java.io.File +import java.text.DecimalFormat import java.util.concurrent.Executors import java.util.function.Consumer import java.util.function.Function +import java.util.stream.Stream +import org.apache.commons.io.FileUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone @@ -188,7 +195,8 @@ class S3ConsumerFactory { s3Config: S3DestinationConfig, catalog: ConfiguredAirbyteCatalog, memoryRatio: Double, - nThreads: Int + nThreads: Int, + featureFlags: FeatureFlags ): SerializedAirbyteMessageConsumer { val writeConfigs = createWriteConfigs(storageOps, s3Config, catalog) // Buffer creation function: yields a file buffer that converts @@ -203,15 +211,6 @@ class S3ConsumerFactory { descriptor to Pair(stream.generationId, stream.syncId) } - val createFunction = - getCreateFunction( - s3Config, - Function { fileExtension: String -> - FileBuffer(fileExtension) - }, - useV2FieldNames = true - ) - // Parquet has significantly higher overhead. This small adjustment // results in a ~5x performance improvement. val adjustedMemoryRatio = @@ -221,25 +220,52 @@ class S3ConsumerFactory { memoryRatio } + // This needs to be called before the creation of the flush function because it updates + // writeConfigs! + val onStartFunction = onStartFunction(storageOps, writeConfigs) + + val streamDescriptorToWriteConfig = + writeConfigs.associateBy { + StreamDescriptor().withNamespace(it.namespace).withName(it.streamName) + } + val flushFunction = + if (featureFlags.useFileTransfer()) { + FileTransferDestinationFlushFunction( + streamDescriptorToWriteConfig, + storageOps, + featureFlags + ) + } else { + val createFunction = + getCreateFunction( + s3Config, + Function { fileExtension: String -> + FileBuffer(fileExtension) + }, + useV2FieldNames = true + ) + S3DestinationFlushFunction( + // Ensure the file buffer is always larger than the memory buffer, + // as the file buffer will be flushed at the end of the memory flush. + optimalBatchSizeBytes = + (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), + { + // Yield a new BufferingStrategy every time we flush (for thread-safety). + SerializedBufferingStrategy( + createFunction, + catalog, + flushBufferFunction(storageOps, writeConfigs, catalog) + ) + }, + generationAndSyncIds + ) + } + return AsyncStreamConsumer( outputRecordCollector, - onStartFunction(storageOps, writeConfigs), + onStartFunction, onCloseFunction(storageOps, writeConfigs), - S3DestinationFlushFunction( - // Ensure the file buffer is always larger than the memory buffer, - // as the file buffer will be flushed at the end of the memory flush. - optimalBatchSizeBytes = - (FileBuffer.MAX_PER_STREAM_BUFFER_SIZE_BYTES * 0.9).toLong(), - { - // Yield a new BufferingStrategy every time we flush (for thread-safety). - SerializedBufferingStrategy( - createFunction, - catalog, - flushBufferFunction(storageOps, writeConfigs, catalog) - ) - }, - generationAndSyncIds - ), + flushFunction, catalog, // S3 has no concept of default namespace // In the "namespace from destination case", the namespace @@ -252,6 +278,52 @@ class S3ConsumerFactory { ) } + private class FileTransferDestinationFlushFunction( + val streamDescriptorToWriteConfig: Map, + val storageOps: S3StorageOperations, + val featureFlags: FeatureFlags + ) : DestinationFlushFunction { + override fun flush( + streamDescriptor: StreamDescriptor, + stream: Stream + ) { + val records = stream.toList() + val writeConfig = streamDescriptorToWriteConfig.getValue(streamDescriptor) + if (records.isEmpty()) { + return + } + if (records.size > 1) { + throw RuntimeException( + "the destinationFlushFunction for RAW_FILES should be called with only 1 record" + ) + } + val file = records[0].record!!.file + if (file == null) { + throw RuntimeException(MISSING_FILE_FIELD_IN_FILE_TRANSFER_ERROR_MESSAGE) + } + val absolutePath = file.fileUrl!! + val relativePath = file.fileRelativePath!! + val fullObjectKey = writeConfig.fullOutputPath + relativePath + val dataFile = File(absolutePath) + val fileSize = dataFile.length() + val startTimeMs = System.currentTimeMillis() + storageOps.loadDataIntoBucket( + fullObjectKey = fullObjectKey, + fileName = dataFile.name, + fileContent = dataFile.inputStream(), + generationId = writeConfig.generationId + ) + val elapsedTimeSeconds = (System.currentTimeMillis() - startTimeMs) / 1_000.0 + val speedMBps = (fileSize / (1_024 * 1_024)) / elapsedTimeSeconds + LOGGER.info { + "wrote ${FileUtils.byteCountToDisplaySize(fileSize)} file in $elapsedTimeSeconds s, for a speed of ${decimalFormat.format(speedMBps)} MBps" + } + dataFile.delete() + } + + override val optimalBatchSizeBytes: Long = 1L + } + private fun isAppendSync(writeConfig: WriteConfig): Boolean { // This is an additional safety check, that this really is OVERWRITE // mode, this avoids bad things happening like deleting all objects @@ -323,8 +395,10 @@ class S3ConsumerFactory { } companion object { - + val decimalFormat = DecimalFormat("#.###") private val SYNC_DATETIME: DateTime = DateTime.now(DateTimeZone.UTC) + val MISSING_FILE_FIELD_IN_FILE_TRANSFER_ERROR_MESSAGE = + "the RECORD message doesn't have a file field in file transfer mode" private fun createWriteConfigs( storageOperations: BlobStorageOperations, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt index cb89081ddd0b..de15e206baac 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt @@ -27,13 +27,16 @@ class S3DestinationFlushFunction( strategyProvider().use { strategy -> for (partialMessage in stream) { val partialRecord = partialMessage.record!! - val data = + if (partialRecord.file != null) { + throw RuntimeException(FILE_RECORD_ERROR_MESSAGE) + } /** * This should always be null, but if something changes upstream to trigger a clone * of the record, then `null` becomes `JsonNull` and `data == null` goes from `true` * to `false` */ - if (partialRecord.data == null || partialRecord.data!!.isNull) { + val data = + if (partialRecord.data == null || partialRecord.data!!.isNull) { Jsons.deserialize(partialMessage.serialized) } else { partialRecord.data @@ -53,4 +56,9 @@ class S3DestinationFlushFunction( strategy.flushSingleStream(nameAndNamespace) } } + + companion object { + val FILE_RECORD_ERROR_MESSAGE = + "received a message of RECORD type with a populated `file` attribute. This should only happen in file transfer mode" + } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 894e53789973..85a8d546a1f8 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -22,8 +22,7 @@ import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFact import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil import io.airbyte.commons.exceptions.ConfigErrorException import io.github.oshai.kotlinlogging.KotlinLogging -import java.io.IOException -import java.io.OutputStream +import java.io.* import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -194,15 +193,32 @@ open class S3StorageOperations( } while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey)) return fullObjectKey } + @Throws(IOException::class) private fun loadDataIntoBucket( objectPath: String, recordsData: SerializableBuffer, generationId: Long + ): String { + val fullObjectKey: String = getFileName(objectPath, recordsData) + return loadDataIntoBucket( + fullObjectKey, + recordsData.filename, + recordsData.inputStream!!, + generationId + ) + } + + @Throws(IOException::class) + public fun loadDataIntoBucket( + fullObjectKey: String, + fileName: String, + fileContent: InputStream, + generationId: Long ): String { val partSize: Long = DEFAULT_PART_SIZE.toLong() val bucket: String? = s3Config.bucketName - val fullObjectKey: String = getFileName(objectPath, recordsData) + val metadata: MutableMap = HashMap() for (blobDecorator: BlobDecorator in blobDecorators) { blobDecorator.updateMetadata(metadata, getMetadataMapping()) @@ -232,13 +248,13 @@ open class S3StorageOperations( try { rawOutputStream.use { outputStream -> - recordsData.inputStream!!.use { dataStream -> + fileContent.use { dataStream -> dataStream.transferTo(outputStream) succeeded = true } } } catch (e: Exception) { - logger.error(e) { "Failed to load data into storage $objectPath" } + logger.error(e) { "Failed to load data into storage $fullObjectKey" } throw RuntimeException(e) } finally { if (!succeeded) { @@ -253,7 +269,7 @@ open class S3StorageOperations( } val newFilename: String = getFilename(fullObjectKey) logger.info { - "Uploaded buffer file to storage: ${recordsData.filename} -> $fullObjectKey (filename: $newFilename)" + "Uploaded buffer file to storage: $fileName -> $fullObjectKey (filename: $newFilename)" } return newFilename } @@ -611,7 +627,7 @@ open class S3StorageOperations( private const val FORMAT_VARIABLE_EPOCH: String = "\${EPOCH}" private const val FORMAT_VARIABLE_UUID: String = "\${UUID}" private const val GZ_FILE_EXTENSION: String = "gz" - private const val GENERATION_ID_USER_META_KEY = "ab-generation-id" + const val GENERATION_ID_USER_META_KEY = "ab-generation-id" @VisibleForTesting @JvmStatic fun getFilename(fullPath: String): String { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt new file mode 100644 index 000000000000..845b2bc9e637 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseDestinationAcceptanceTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.s3 + +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.S3ObjectSummary +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer +import io.airbyte.cdk.integrations.standardtest.destination.BaseDestinationAcceptanceTest +import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest +import io.airbyte.commons.io.IOs +import io.airbyte.commons.json.Jsons +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path +import java.util.Comparator +import java.util.HashSet +import org.apache.commons.lang3.RandomStringUtils +import org.joda.time.DateTime +import org.joda.time.DateTimeZone +import org.mockito.Mockito + +private val LOGGER = KotlinLogging.logger {} + +abstract class S3BaseDestinationAcceptanceTest() : + BaseDestinationAcceptanceTest( + verifyIndividualStateAndCounts = true, + ) { + protected val secretFilePath: String = "secrets/config.json" + override val imageName: String + get() = "airbyte/destination-s3:dev" + protected var configJson: JsonNode? = null + + override fun getConfig(): JsonNode = configJson!! + protected open val baseConfigJson: JsonNode + get() = Jsons.deserialize(IOs.readFile(Path.of(secretFilePath))) + protected abstract val formatConfig: JsonNode? + get + protected var s3DestinationConfig: S3DestinationConfig = Mockito.mock() + protected var s3Client: AmazonS3? = null + protected var s3nameTransformer: NamingConventionTransformer = Mockito.mock() + protected var s3StorageOperations: S3StorageOperations? = null + var testBucketPath: String? = null + + fun storageProvider(): StorageProvider { + return StorageProvider.AWS_S3 + } + + /** + * This method does the following: + * * Construct the S3 destination config. + * * Construct the S3 client. + */ + override fun setup( + testEnv: DestinationAcceptanceTest.TestDestinationEnv, + TEST_SCHEMAS: HashSet + ) { + val baseConfigJson = baseConfigJson + // Set a random s3 bucket path for each integration test + val configJson = Jsons.clone(baseConfigJson) + testBucketPath = + String.format( + "test_%s", + RandomStringUtils.insecure().nextAlphanumeric(5), + ) + (configJson as ObjectNode) + .put("s3_bucket_path", testBucketPath) + .set("format", formatConfig) + this.configJson = configJson + this.s3DestinationConfig = + S3DestinationConfig.getS3DestinationConfig( + configJson, + storageProvider(), + getConnectorEnv() + ) + LOGGER.info { + "${"Test full path: {}/{}"} ${s3DestinationConfig.bucketName} ${s3DestinationConfig.bucketPath}" + } + + this.s3Client = s3DestinationConfig.getS3Client() + this.s3nameTransformer = S3NameTransformer() + this.s3StorageOperations = + S3StorageOperations(s3nameTransformer, s3Client!!, s3DestinationConfig) + } + + fun getDefaultSchema(): String { + if (configJson!!.has("s3_bucket_path")) { + return configJson!!["s3_bucket_path"].asText() + } + throw RuntimeException() + } + + /** Helper method to retrieve all synced objects inside the configured bucket path. */ + protected fun getAllSyncedObjects( + streamName: String, + ): List { + val namespaceStr = s3nameTransformer.getNamespace(getDefaultSchema()) + val streamNameStr = s3nameTransformer.getIdentifier(streamName) + val outputPrefix = + s3StorageOperations!!.getBucketObjectPath( + namespaceStr, + streamNameStr, + DateTime.now(DateTimeZone.UTC), + s3DestinationConfig.pathFormat!!, + ) + // the child folder contains a non-deterministic epoch timestamp, so use the parent folder + val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1) + val objectSummaries = + s3Client!! + .listObjects(s3DestinationConfig.bucketName, parentFolder) + .objectSummaries + .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } + .sortedWith(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) + + LOGGER.info { + "${"All objects: {}"} ${ + objectSummaries.map { o: S3ObjectSummary -> + String.format("%s/%s", o.bucketName, o.key) + } + }" + } + return objectSummaries + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 8095464a4d62..ff99342ce11c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -12,11 +12,13 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataArgumentsProvider import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons @@ -27,9 +29,11 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.nio.file.Path import java.time.Instant import java.util.* +import kotlin.test.assertContains import org.apache.commons.lang3.RandomStringUtils import org.joda.time.DateTime import org.joda.time.DateTimeZone +import org.junit.Assert.fail import org.junit.jupiter.api.Assumptions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -734,6 +738,79 @@ protected constructor( ) } + @Test + open fun testFakeFileTransfer() { + val streamSchema = JsonNodeFactory.instance.objectNode() + streamSchema.set("properties", JsonNodeFactory.instance.objectNode()) + val streamName = "str" + RandomStringUtils.randomAlphanumeric(5) + val catalog = + ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) + .withStream( + AirbyteStream().withName(streamName).withJsonSchema(streamSchema) + ), + ), + ) + + val recordMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(ObjectMapper().readTree("{}")) + .withAdditionalProperty( + "file", + AirbyteRecordMessageFile( + fileUrl = + "${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}/fakeFile", + bytes = 182776, + fileRelativePath = "fakeFile", + modified = 123456L, + sourceFileUrl = + "//sftp-testing-for-file-transfer/sftp-folder/simpsons_locations.csv", + ) + ) + ) + val streamCompleteMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + .withStreamDescriptor(StreamDescriptor().withName(streamName)) + ) + ) + try { + val destinationOutput = + runSync( + config = getConfig(), + messages = listOf(recordMessage, streamCompleteMessage), + catalog = catalog, + runNormalization = false, + imageName = imageName, + ) + fail("sync should have failed. Instead got output $destinationOutput") + } catch (e: TestHarnessException) { + assertContains( + e.outputMessages!![0].trace.error.internalMessage, + S3DestinationFlushFunction.FILE_RECORD_ERROR_MESSAGE + ) + } + } + companion object { @JvmStatic protected val MAPPER: ObjectMapper = MoreMappers.initMapper() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 1c7c08350d01..dcfdda420636 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", emptyMap() ) diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index b5faddcddae5..9cff28d2429a 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.46.1' + cdkVersionRequired = '0.48.0' features = ['db-destinations', 's3-destinations'] - useLocalCdk = true + useLocalCdk = false } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 727d7c75c210..5213f45e5c60 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.3.0 + dockerImageTag: 1.4.0 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg @@ -34,6 +34,7 @@ data: ql: 300 supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt new file mode 100644 index 000000000000..5976164f32a1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3FileTransferDestinationTest.kt @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3 + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import com.fasterxml.jackson.databind.node.ObjectNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest +import io.airbyte.cdk.integrations.destination.s3.S3ConsumerFactory +import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations +import io.airbyte.cdk.integrations.destination.s3.constant.S3Constants +import io.airbyte.cdk.integrations.destination.s3.util.Flattening +import io.airbyte.commons.features.EnvVariableFeatureFlags +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.* +import io.airbyte.workers.exception.TestHarnessException +import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path +import java.time.Instant +import kotlin.io.path.createDirectories +import kotlin.io.path.createFile +import kotlin.io.path.writeText +import kotlin.random.Random +import kotlin.test.* +import org.apache.commons.lang3.RandomStringUtils +import org.junit.jupiter.api.Test + +private val LOGGER = KotlinLogging.logger {} + +class S3FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { + override val supportsFileTransfer = true + override val formatConfig: JsonNode + get() = + Jsons.jsonNode( + java.util.Map.of( + "format_type", + FileUploadFormat.CSV, + "flattening", + Flattening.ROOT_LEVEL.value, + "compression", + Jsons.jsonNode(java.util.Map.of("compression_type", "No Compression")), + ) + ) + override val baseConfigJson: JsonNode + get() = + (super.baseConfigJson as ObjectNode).put( + S3Constants.S_3_PATH_FORMAT, + "\${NAMESPACE}/\${STREAM_NAME}/" + ) + + private fun getStreamCompleteMessage(streamName: String): AirbyteMessage { + return AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ) + .withStreamDescriptor(StreamDescriptor().withName(streamName)) + ) + ) + } + + private fun createFakeFile(): Path { + val depth = Random.nextInt(10) + val dirPath = + (0..depth).joinToString("/") { + "dir" + RandomStringUtils.insecure().nextAlphanumeric(5) + } + val fileName = "fakeFile" + RandomStringUtils.insecure().nextAlphanumeric(5) + val filePath = "$dirPath/$fileName" + val fileSize = 1_024 * 1_024 + + fileTransferMountSource!!.resolve(dirPath).createDirectories() + val absoluteFilePath = + fileTransferMountSource!! + .resolve(filePath) + .createFile() + .writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize)) + return Path.of(filePath) + } + + private fun configureCatalog(streamName: String, generationId: Long): ConfiguredAirbyteCatalog { + val streamSchema = JsonNodeFactory.instance.objectNode() + streamSchema.set("properties", JsonNodeFactory.instance.objectNode()) + return ConfiguredAirbyteCatalog() + .withStreams( + java.util.List.of( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withGenerationId(generationId) + .withMinimumGenerationId(generationId) + .withSyncId(0) + .withStream( + AirbyteStream().withName(streamName).withJsonSchema(streamSchema) + ), + ), + ) + } + + private fun createMessageForFile(streamName: String, relativeFilePath: Path): AirbyteMessage { + val absoluteFilePath = + EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY.resolve(relativeFilePath) + return AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(ObjectMapper().readTree("{}")) + .withAdditionalProperty( + "file", + AirbyteRecordMessageFile( + fileUrl = absoluteFilePath.toString(), + bytes = absoluteFilePath.toFile().length(), + fileRelativePath = "$relativeFilePath", + modified = 123456L, + sourceFileUrl = "//sftp-testing-for-file-transfer/$relativeFilePath", + ) + ) + ) + } + + @Test + fun checkRecordSyncFails() { + val streamName = "str" + RandomStringUtils.insecure().nextAlphanumeric(5) + val catalog = configureCatalog(streamName, 0) + val recordBasedMessage = + AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord( + AirbyteRecordMessage() + .withStream(streamName) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData( + Jsons.jsonNode( + ImmutableMap.builder() + .put("id", 1) + .put("currency", "USD") + .put("date", "2020-03-31T00:00:00Z") + .put("HKD", 10.1) + .put("NZD", 700.1) + .build(), + ) + ) + ) + try { + runSyncAndVerifyStateOutput( + getConfig(), + listOf(recordBasedMessage, getStreamCompleteMessage(streamName)), + catalog, + false + ) + fail("should have failed!") + } catch (e: TestHarnessException) { + assertContains( + e.outputMessages!![0].trace.error.internalMessage, + S3ConsumerFactory.MISSING_FILE_FIELD_IN_FILE_TRANSFER_ERROR_MESSAGE + ) + } + } + + @Test + fun testFakeFileTransfer() { + LOGGER.info { + "${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY} is mounted from $fileTransferMountSource" + } + val streamName = "str" + RandomStringUtils.insecure().nextAlphanumeric(5) + val filePath = createFakeFile() + val file = fileTransferMountSource!!.resolve(filePath).toFile() + val fileLength = file.length() + val fileContent = file.readBytes() + val catalog = configureCatalog(streamName, 32) + val recordMessage = createMessageForFile(streamName, filePath) + + runSyncAndVerifyStateOutput( + getConfig(), + listOf(recordMessage, getStreamCompleteMessage(streamName)), + catalog, + false + ) + val allObjectsInStore = getAllSyncedObjects(streamName) + val objectInStore = allObjectsInStore[0] + val objectMetadata = + s3Client!!.getObjectMetadata(objectInStore.bucketName, objectInStore.key) + val generationId = + objectMetadata + .getUserMetaDataOf(S3StorageOperations.GENERATION_ID_USER_META_KEY) + .toLong() + assertEquals(generationId, 32L) + assertFalse(file.exists(), "file should have been deleted by the connector") + assertEquals(fileLength, objectInStore.size) + assertEquals("$testBucketPath/$streamName/${filePath.toString()}", objectInStore.key) + assertContentEquals( + fileContent, + s3Client!! + .getObject(objectInStore.bucketName, objectInStore.key) + .objectContent + .readBytes() + ) + } +} diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index fabfe3eb90e9..da0700b55fe9 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -536,7 +536,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | +| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | +| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | | 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |