diff --git a/airflow/io/path.py b/airflow/io/path.py index bd7c320653aae..d65d837e7e5db 100644 --- a/airflow/io/path.py +++ b/airflow/io/path.py @@ -52,6 +52,9 @@ def __init__( conn_id: str | None = None, **kwargs: typing.Any, ) -> None: + # warning: we are not calling super().__init__ here + # as it will try to create a new fs from a different + # set if registered filesystems if parsed_url and parsed_url.scheme: self._store = attach(parsed_url.scheme, conn_id) else: @@ -173,10 +176,16 @@ def bucket(self) -> str: @property def key(self) -> str: if self._url: - return self._url.path + # per convention, we strip the leading slashes to ensure a relative key is returned + # we keep the trailing slash to allow for directory-like semantics + return self._url.path.lstrip(self.sep) else: return "" + @property + def namespace(self) -> str: + return f"{self.protocol}://{self.bucket}" if self.bucket else self.protocol + def stat(self) -> stat_result: # type: ignore[override] """Call ``stat`` and return the result.""" return stat_result( diff --git a/airflow/providers/common/io/operators/file_transfer.py b/airflow/providers/common/io/operators/file_transfer.py index e720f786669f0..e79212882d3c4 100644 --- a/airflow/providers/common/io/operators/file_transfer.py +++ b/airflow/providers/common/io/operators/file_transfer.py @@ -23,6 +23,7 @@ from airflow.models import BaseOperator if TYPE_CHECKING: + from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context @@ -64,21 +65,33 @@ def __init__( self.overwrite = overwrite def execute(self, context: Context) -> None: - src: ObjectStoragePath - dst: ObjectStoragePath - - if isinstance(self.src, str): - src = ObjectStoragePath(self.src, conn_id=self.source_conn_id) - else: - src = self.src - - if isinstance(self.dst, str): - dst = ObjectStoragePath(self.dst, conn_id=self.dst_conn_id) - else: - dst = self.dst + src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id) + dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id) if not self.overwrite: if dst.exists() and dst.is_file(): raise ValueError(f"Destination {dst} already exists") src.copy(dst) + + def get_openlineage_facets_on_start(self) -> OperatorLineage: + from openlineage.client.run import Dataset + + from airflow.providers.openlineage.extractors import OperatorLineage + + src: ObjectStoragePath = self._get_path(self.src, self.source_conn_id) + dst: ObjectStoragePath = self._get_path(self.dst, self.dst_conn_id) + + input_dataset = Dataset(namespace=src.namespace, name=src.key) + output_dataset = Dataset(namespace=dst.namespace, name=dst.key) + + return OperatorLineage( + inputs=[input_dataset], + outputs=[output_dataset], + ) + + @staticmethod + def _get_path(path: str | ObjectStoragePath, conn_id: str | None) -> ObjectStoragePath: + if isinstance(path, str): + return ObjectStoragePath(path, conn_id=conn_id) + return path diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 495c98fd877a1..8c11e2628e393 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -525,7 +525,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "tests/providers/common/io/operators/test_file_transfer.py", ), { - "affected-providers-list-as-string": "common.io", + "affected-providers-list-as-string": "common.io openlineage", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -538,7 +538,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "docs-build": "false", "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Providers[common.io]", + "parallel-test-types-list-as-string": "Always Providers[common.io,openlineage]", }, id="Only Always and Common.IO tests should run when only common.io and tests/always changed", ), diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst b/docs/apache-airflow/core-concepts/objectstorage.rst index f5b113a861f36..5a3919e433a47 100644 --- a/docs/apache-airflow/core-concepts/objectstorage.rst +++ b/docs/apache-airflow/core-concepts/objectstorage.rst @@ -76,7 +76,8 @@ object you want to interact with. For example, to point to a bucket in s3, you w base = ObjectStoragePath("s3://aws_default@my-bucket/") -The username part of the URI is optional. It can alternatively be passed in as a separate keyword argument: +The username part of the URI represents the Airflow connection id and is optional. It can alternatively be passed +in as a separate keyword argument: .. code-block:: python @@ -242,6 +243,11 @@ key Returns the object key. +namespace +^^^^^^^^^ + +Returns the namespace of the object. Typically this is the protocol, like ``s3://`` with the +bucket name. path ^^^^ diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index fe07b77f44c9f..0366b3ebab8e1 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -300,7 +300,9 @@ "deps": [ "apache-airflow>=2.8.0" ], - "cross-providers-deps": [], + "cross-providers-deps": [ + "openlineage" + ], "excluded-python-versions": [], "state": "ready" }, diff --git a/tests/io/test_path.py b/tests/io/test_path.py index af6044c150192..ab143b038e00b 100644 --- a/tests/io/test_path.py +++ b/tests/io/test_path.py @@ -72,18 +72,21 @@ def test_alias(self): def test_init_objectstoragepath(self): path = ObjectStoragePath("file://bucket/key/part1/part2") assert path.bucket == "bucket" - assert path.key == "/key/part1/part2" + assert path.key == "key/part1/part2" assert path.protocol == "file" + assert path.path == "bucket/key/part1/part2" path2 = ObjectStoragePath(path / "part3") assert path2.bucket == "bucket" - assert path2.key == "/key/part1/part2/part3" + assert path2.key == "key/part1/part2/part3" assert path2.protocol == "file" + assert path2.path == "bucket/key/part1/part2/part3" path3 = ObjectStoragePath(path2 / "2023") assert path3.bucket == "bucket" - assert path3.key == "/key/part1/part2/part3/2023" + assert path3.key == "key/part1/part2/part3/2023" assert path3.protocol == "file" + assert path3.path == "bucket/key/part1/part2/part3/2023" def test_read_write(self): o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}") @@ -171,7 +174,7 @@ def test_bucket_key_protocol(self): o = ObjectStoragePath(f"{protocol}://{bucket}/{key}") assert o.bucket == bucket assert o.container == bucket - assert o.key == f"/{key}" + assert o.key == f"{key}" assert o.protocol == protocol def test_cwd_home(self): diff --git a/tests/providers/common/io/operators/test_file_transfer.py b/tests/providers/common/io/operators/test_file_transfer.py index e2cb68ca43b66..c48a61c314322 100644 --- a/tests/providers/common/io/operators/test_file_transfer.py +++ b/tests/providers/common/io/operators/test_file_transfer.py @@ -19,6 +19,8 @@ from unittest import mock +from openlineage.client.run import Dataset + from airflow.providers.common.io.operators.file_transfer import FileTransferOperator @@ -45,3 +47,25 @@ def test_file_transfer_copy(): ) source_path.copy.assert_called_once_with(target_path) target_path.copy.assert_not_called() + + +def test_get_openlineage_facets_on_start(): + src_bucket = "src-bucket" + src_key = "src-key" + dst_bucket = "dst-bucket" + dst_key = "dst-key" + + expected_input = Dataset(namespace=f"s3://{src_bucket}", name=src_key) + expected_output = Dataset(namespace=f"s3://{dst_bucket}", name=dst_key) + + op = FileTransferOperator( + task_id="test", + src=f"s3://{src_bucket}/{src_key}", + dst=f"s3://{dst_bucket}/{dst_key}", + ) + + lineage = op.get_openlineage_facets_on_start() + assert len(lineage.inputs) == 1 + assert len(lineage.outputs) == 1 + assert lineage.inputs[0] == expected_input + assert lineage.outputs[0] == expected_output