From a9190f7b71fd26daf8565ed6aae4107270248469 Mon Sep 17 00:00:00 2001 From: rconroy293 Date: Tue, 14 Jan 2020 05:36:27 -0800 Subject: [PATCH] [AIRFLOW-6522] Clear task log file before starting to fix duplication in S3TaskHandler (#7120) The same task instance (including try number) can be run on a worker when using a sensor in "reschedule" mode. Accordingly, this clears the local log file when re-initializing the logger so that the old log lines aren't uploaded again when the logger is closed. (cherry picked from commit 88608caa56bf3621807af860a6a378242220de47) --- airflow/utils/log/s3_task_handler.py | 6 ++++++ tests/utils/log/test_s3_task_handler.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 609a9da7d26a73..f2885bab9bd362 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -59,6 +59,12 @@ def set_context(self, ti): self.log_relative_path = self._render_filename(ti, ti.try_number) self.upload_on_close = not ti.raw + # Clear the file first so that duplicate data is not uploaded + # when re-using the same path (e.g. with rescheduled sensors) + if self.upload_on_close: + with open(self.handler.baseFilename, 'w'): + pass + def close(self): """ Close and upload local log file to remote storage S3. diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index ccad3a07d51925..a3dd9fb97de6e7 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -107,6 +107,24 @@ def test_log_exists_no_hook(self): mock_hook.side_effect = Exception('Failed to connect') self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) + def test_set_context_raw(self): + self.ti.raw = True + mock_open = mock.mock_open() + with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open): + self.s3_task_handler.set_context(self.ti) + + self.assertFalse(self.s3_task_handler.upload_on_close) + mock_open.assert_not_called() + + def test_set_context_not_raw(self): + mock_open = mock.mock_open() + with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open): + self.s3_task_handler.set_context(self.ti) + + self.assertTrue(self.s3_task_handler.upload_on_close) + mock_open.assert_called_once_with(os.path.abspath('local/log/location/1.log'), 'w') + mock_open().write.assert_not_called() + def test_read(self): self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n') self.assertEqual(