diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py b/airflow/providers/amazon/aws/utils/task_log_fetcher.py index a4cad6c099c2f6..83c42f685792a7 100644 --- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py +++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py @@ -59,8 +59,20 @@ def run(self) -> None: while not self.is_stopped(): time.sleep(self.fetch_interval.total_seconds()) log_events = self._get_log_events(continuation_token) + prev_timestamp_event = None for log_event in log_events: + current_timestamp_event = datetime.fromtimestamp( + log_event["timestamp"] / 1000.0, tz=timezone.utc + ) + if current_timestamp_event == prev_timestamp_event: + # When multiple events have the same timestamp, somehow, only one event is logged + # As a consequence, some logs are missed in the log group (in case they have the same + # timestamp) + # When a slight delay is added before logging the event, that solves the issue + # See https://github.com/apache/airflow/issues/40875 + time.sleep(0.1) self.logger.info(self.event_to_str(log_event)) + prev_timestamp_event = current_timestamp_event def _get_log_events(self, skip_token: AwsLogsHook.ContinuationToken | None = None) -> Generator: if skip_token is None: