Skip to content

Commit

Permalink
cdk-java: add file transfer mount to DestinationAcceptanceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 3, 2024
1 parent bc093d8 commit 9861dee
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import io.airbyte.protocol.models.v0.ConnectorSpecification
import java.nio.file.Path

abstract class BaseConnector : Integration {
open val featureFlags: FeatureFlags = EnvVariableFeatureFlags()
Expand All @@ -31,4 +32,8 @@ abstract class BaseConnector : Integration {
val resourceString = MoreResources.readResource("spec.json")
return Jsons.deserialize(resourceString, ConnectorSpecification::class.java)
}

companion object {
val FILE_TRANSFER_DIRECTORY: Path = Path.of("/file-transfer")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ abstract class DestinationAcceptanceTest(
protected var testSchemas: HashSet<String> = HashSet()

private lateinit var testEnv: TestDestinationEnv
protected var fileTransferMountSource: Path? = null
private set
protected open val isCloudTest: Boolean = true
protected val featureFlags: FeatureFlags =
if (isCloudTest) {
Expand Down Expand Up @@ -435,12 +437,15 @@ abstract class DestinationAcceptanceTest(
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
testSchemas = HashSet()
setup(testEnv, testSchemas)
fileTransferMountSource =
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null

processFactory =
DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource,
"host",
getConnectorEnv()
)
Expand Down Expand Up @@ -2684,6 +2689,8 @@ abstract class DestinationAcceptanceTest(
return supportsInDestinationNormalization() || normalizationFromDefinition()
}

protected open val supportsFileTransfer: Boolean = false

companion object {
private val RANDOM = Random()
private const val NORMALIZATION_VERSION = "dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
envMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Joiner
import com.google.common.base.Strings
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.BaseConnector
import io.airbyte.commons.io.IOs
import io.airbyte.commons.io.LineGobbler
import io.airbyte.commons.map.MoreMaps
Expand All @@ -30,6 +31,7 @@ class DockerProcessFactory(
private val workspaceRoot: Path,
private val workspaceMountSource: String?,
private val localMountSource: String?,
private val fileTransferMountSource: Path?,
private val networkName: String?,
private val envMap: Map<String, String>
) : ProcessFactory {
Expand Down Expand Up @@ -125,6 +127,11 @@ class DockerProcessFactory(
cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION))
}

if (fileTransferMountSource != null) {
cmd.add("-v")
cmd.add("$fileTransferMountSource:${BaseConnector.FILE_TRANSFER_DIRECTORY}")
}

val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables)
for ((key, value) in allEnvMap) {
cmd.add("-e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ protected constructor(
override val imageName: String
get() = "airbyte/destination-s3:dev"

override val supportsFileTransfer = true

override fun getDefaultSchema(config: JsonNode): String? {
if (config.has("s3_bucket_path")) {
return config["s3_bucket_path"].asText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
emptyMap()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.46.1'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.2.1
dockerImageTag: 1.3.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.airbyte.integrations.destination.s3

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
import io.airbyte.cdk.integrations.destination.s3.StorageProvider
Expand All @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination {
override fun storageProvider(): StorageProvider {
return StorageProvider.AWS_S3
}

companion object {
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
IntegrationRunner(S3Destination()).run(args)
}
}
}
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
Expand Down

0 comments on commit 9861dee

Please sign in to comment.