Skip to content

Commit

Permalink
[Fix apache#41763]: Redundant forward slash in SFTPToGCSOperator when…
Browse files Browse the repository at this point in the history
… destination_path is not specified or have default value (apache#41928)

* fix the redundant forward issue if the has

* Add a test for fix

* respect airflow standards linting

* update destination path to destination_path in comment for clarity

---------

Co-authored-by: Mayuresh Kedari <[email protected]>
  • Loading branch information
2 people authored and Illumaria committed Sep 3, 2024
1 parent 0d61450 commit b7c4bcd
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/google/cloud/transfers/sftp_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ def execute(self, context: Context):

for file in files:
destination_path = file.replace(base_path, self.destination_path, 1)
# See issue: https://github.com/apache/airflow/issues/41763
# If the destination_path is not specified, it defaults to an empty string. As a result,
# replacing base_path with an empty string is ineffective, causing the destination_path to
# retain the "/" prefix, if it has.
if not self.destination_path:
destination_path = destination_path.lstrip("/")
self._copy_single_object(gcs_hook, sftp_hook, file, destination_path)

else:
Expand Down
49 changes: 49 additions & 0 deletions tests/providers/google/cloud/transfers/test_sftp_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

TEST_BUCKET = "test-bucket"
SOURCE_OBJECT_WILDCARD_FILENAME = "main_dir/test_object*.json"
SOURCE_OBJECT_WILDCARD_TXT_FILENAME = "main_dir/test_object*.txt"
SOURCE_OBJECT_NO_WILDCARD = "main_dir/test_object3.json"
SOURCE_OBJECT_MULTIPLE_WILDCARDS = "main_dir/csv/*/test_*.csv"

Expand Down Expand Up @@ -252,3 +253,51 @@ def test_execute_more_than_one_wildcard_exception(self, sftp_hook, gcs_hook):

err = ctx.value
assert "Only one wildcard '*' is allowed in source_path parameter" in str(err)

@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook")
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook")
def test_execute_copy_with_wildcard_and_default_destination_path(self, sftp_hook, gcs_hook):
sftp_hook.return_value.get_tree_map.return_value = [
["main_dir/test_object1.txt", "main_dir/test_object2.txt"],
[],
[],
]

task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_WILDCARD_TXT_FILENAME,
destination_bucket=TEST_BUCKET,
gcp_conn_id=GCP_CONN_ID,
sftp_conn_id=SFTP_CONN_ID,
)
task.execute(None)

sftp_hook.return_value.get_tree_map.assert_called_with(
"main_dir", prefix="main_dir/test_object", delimiter=".txt"
)

sftp_hook.return_value.retrieve_file.assert_has_calls(
[
mock.call("main_dir/test_object1.txt", mock.ANY, prefetch=True),
mock.call("main_dir/test_object2.txt", mock.ANY, prefetch=True),
]
)

gcs_hook.return_value.upload.assert_has_calls(
[
mock.call(
bucket_name=TEST_BUCKET,
object_name="test_object1.txt",
mime_type=DEFAULT_MIME_TYPE,
filename=mock.ANY,
gzip=False,
),
mock.call(
bucket_name=TEST_BUCKET,
object_name="test_object2.txt",
mime_type=DEFAULT_MIME_TYPE,
filename=mock.ANY,
gzip=False,
),
]
)

0 comments on commit b7c4bcd

Please sign in to comment.