Skip to content

Commit

Permalink
[AIRFLOW-6522] Clear task log file before starting to fix duplication…
Browse files Browse the repository at this point in the history
… in S3TaskHandler (#7120)

The same task instance (including try number) can be run on a worker
when using a sensor in "reschedule" mode. Accordingly, this clears the
local log file when re-initializing the logger so that the old log
lines aren't uploaded again when the logger is closed.

(cherry picked from commit 88608ca)
  • Loading branch information
rconroy293 authored and potiuk committed Jan 26, 2020
1 parent 5e8d7d4 commit a9190f7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/utils/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def set_context(self, ti):
self.log_relative_path = self._render_filename(ti, ti.try_number)
self.upload_on_close = not ti.raw

# Clear the file first so that duplicate data is not uploaded
# when re-using the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
with open(self.handler.baseFilename, 'w'):
pass

def close(self):
"""
Close and upload local log file to remote storage S3.
Expand Down
18 changes: 18 additions & 0 deletions tests/utils/log/test_s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ def test_log_exists_no_hook(self):
mock_hook.side_effect = Exception('Failed to connect')
self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location))

def test_set_context_raw(self):
self.ti.raw = True
mock_open = mock.mock_open()
with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open):
self.s3_task_handler.set_context(self.ti)

self.assertFalse(self.s3_task_handler.upload_on_close)
mock_open.assert_not_called()

def test_set_context_not_raw(self):
mock_open = mock.mock_open()
with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open):
self.s3_task_handler.set_context(self.ti)

self.assertTrue(self.s3_task_handler.upload_on_close)
mock_open.assert_called_once_with(os.path.abspath('local/log/location/1.log'), 'w')
mock_open().write.assert_not_called()

def test_read(self):
self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n')
self.assertEqual(
Expand Down

0 comments on commit a9190f7

Please sign in to comment.