Skip to content

Commit

Permalink
[AIRFLOW-3002] Correctly test success of file download
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffkpayne committed Sep 4, 2018
1 parent 9c0784a commit eb163f7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
4 changes: 2 additions & 2 deletions airflow/contrib/operators/dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ def google_cloud_to_local(self, file_name):
object_id = '/'.join(path_components[1:])
local_file = '/tmp/dataflow{}-{}'.format(str(uuid.uuid1())[:8],
path_components[-1])
file_size = self._gcs_hook.download(bucket_id, object_id, local_file)
self._gcs_hook.download(bucket_id, object_id, local_file)

if os.stat(file_size).st_size > 0:
if os.stat(local_file).st_size > 0:
return local_file
raise Exception(
'Failed to download Google Cloud Storage GCS object: {}'
Expand Down
26 changes: 24 additions & 2 deletions tests/contrib/operators/test_dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -209,3 +209,25 @@ def test_invalid_object_path(self, mock_parent_init):
self.assertEquals(
'Invalid Google Cloud Storage (GCS) object path: {}.'.format(file_name),
str(context.exception))

@mock.patch(
'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
)
def test_valid_object_path(self, mock_parent_init):

file_name = 'gs://test-bucket/path/to/obj.jar'
mock_parent_init.return_value = None

gcs_bucket_helper = GoogleCloudBucketHelper()
gcs_bucket_helper._gcs_hook = mock.Mock()

def _mock_download(bucket, object, filename=None):
text_file_contents = "text file contents"
with open(filename, "w") as text_file:
text_file.write(text_file_contents)
return text_file_contents

gcs_bucket_helper._gcs_hook.download.side_effect = _mock_download

local_file = gcs_bucket_helper.google_cloud_to_local(file_name)
self.assertIn('obj.jar', local_file)

0 comments on commit eb163f7

Please sign in to comment.