diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index 609a9da7d26a7..f2885bab9bd36 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -59,6 +59,12 @@ def set_context(self, ti): self.log_relative_path = self._render_filename(ti, ti.try_number) self.upload_on_close = not ti.raw + # Clear the file first so that duplicate data is not uploaded + # when re-using the same path (e.g. with rescheduled sensors) + if self.upload_on_close: + with open(self.handler.baseFilename, 'w'): + pass + def close(self): """ Close and upload local log file to remote storage S3. diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index ccad3a07d5192..a3dd9fb97de6e 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -107,6 +107,24 @@ def test_log_exists_no_hook(self): mock_hook.side_effect = Exception('Failed to connect') self.assertFalse(self.s3_task_handler.s3_log_exists(self.remote_log_location)) + def test_set_context_raw(self): + self.ti.raw = True + mock_open = mock.mock_open() + with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open): + self.s3_task_handler.set_context(self.ti) + + self.assertFalse(self.s3_task_handler.upload_on_close) + mock_open.assert_not_called() + + def test_set_context_not_raw(self): + mock_open = mock.mock_open() + with mock.patch('airflow.utils.log.s3_task_handler.open', mock_open): + self.s3_task_handler.set_context(self.ti) + + self.assertTrue(self.s3_task_handler.upload_on_close) + mock_open.assert_called_once_with(os.path.abspath('local/log/location/1.log'), 'w') + mock_open().write.assert_not_called() + def test_read(self): self.conn.put_object(Bucket='bucket', Key=self.remote_log_key, Body=b'Log line\n') self.assertEqual(