From a198969b5e3acaee67479ebab145d29866607453 Mon Sep 17 00:00:00 2001 From: Darren Weber Date: Fri, 23 Aug 2019 09:22:14 -0700 Subject: [PATCH] [AIRFLOW-5218] Less polling of AWS Batch job status (#5825) https://issues.apache.org/jira/browse/AIRFLOW-5218 - avoid the AWS API throttle limits for highly concurrent tasks - a small increase in the backoff factor could avoid excessive polling - random sleep before polling to allow the batch task to spin-up - the random sleep helps to avoid API throttling - revise the retry logic slightly to avoid unnecessary pause when there are no more retries required (cherry picked from commit fc972fb6c82010f9809a437eb6b9772918a584d2) --- .../contrib/operators/awsbatch_operator.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/airflow/contrib/operators/awsbatch_operator.py b/airflow/contrib/operators/awsbatch_operator.py index 28bea813c45aed..730aa5a8a45a59 100644 --- a/airflow/contrib/operators/awsbatch_operator.py +++ b/airflow/contrib/operators/awsbatch_operator.py @@ -20,6 +20,7 @@ import sys from math import pow +from random import randint from time import sleep from airflow.exceptions import AirflowException @@ -76,8 +77,8 @@ def __init__(self, job_name, job_definition, job_queue, overrides, max_retries=4 self.overrides = overrides self.max_retries = max_retries - self.jobId = None - self.jobName = None + self.jobId = None # pylint: disable=invalid-name + self.jobName = None # pylint: disable=invalid-name self.hook = self.get_hook() @@ -130,19 +131,26 @@ def _wait_for_task_ended(self): waiter.wait(jobs=[self.jobId]) except ValueError: # If waiter not available use expo - retry = True - retries = 0 - - while retries < self.max_retries and retry: - self.log.info('AWS Batch retry in the next %s seconds', retries) - response = self.client.describe_jobs( - jobs=[self.jobId] - ) - if response['jobs'][-1]['status'] in ['SUCCEEDED', 'FAILED']: - retry = False - - sleep(1 + pow(retries * 0.1, 2)) + + # Allow a batch job some time to spin up. A random interval + # decreases the chances of exceeding an AWS API throttle + # limit when there are many concurrent tasks. + pause = randint(5, 30) + + retries = 1 + while retries <= self.max_retries: + self.log.info('AWS Batch job (%s) status check (%d of %d) in the next %.2f seconds', + self.jobId, retries, self.max_retries, pause) + sleep(pause) + + response = self.client.describe_jobs(jobs=[self.jobId]) + status = response['jobs'][-1]['status'] + self.log.info('AWS Batch job (%s) status: %s', self.jobId, status) + if status in ['SUCCEEDED', 'FAILED']: + break + retries += 1 + pause = 1 + pow(retries * 0.3, 2) def _check_success_task(self): response = self.client.describe_jobs(