Skip to content

Commit

Permalink
[AIRFLOW-5218] Less polling of AWS Batch job status (#5825)
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/AIRFLOW-5218
- avoid the AWS API throttle limits for highly concurrent tasks
- a small increase in the backoff factor could avoid excessive polling
- random sleep before polling to allow the batch task to spin-up
  - the random sleep helps to avoid API throttling
- revise the retry logic slightly to avoid unnecessary pause
  when there are no more retries required

(cherry picked from commit fc972fb)
  • Loading branch information
dazza-codes authored and ashb committed Oct 11, 2019
1 parent f98ce4a commit a198969
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions airflow/contrib/operators/awsbatch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys

from math import pow
from random import randint
from time import sleep

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -76,8 +77,8 @@ def __init__(self, job_name, job_definition, job_queue, overrides, max_retries=4
self.overrides = overrides
self.max_retries = max_retries

self.jobId = None
self.jobName = None
self.jobId = None # pylint: disable=invalid-name
self.jobName = None # pylint: disable=invalid-name

self.hook = self.get_hook()

Expand Down Expand Up @@ -130,19 +131,26 @@ def _wait_for_task_ended(self):
waiter.wait(jobs=[self.jobId])
except ValueError:
# If waiter not available use expo
retry = True
retries = 0

while retries < self.max_retries and retry:
self.log.info('AWS Batch retry in the next %s seconds', retries)
response = self.client.describe_jobs(
jobs=[self.jobId]
)
if response['jobs'][-1]['status'] in ['SUCCEEDED', 'FAILED']:
retry = False

sleep(1 + pow(retries * 0.1, 2))

# Allow a batch job some time to spin up. A random interval
# decreases the chances of exceeding an AWS API throttle
# limit when there are many concurrent tasks.
pause = randint(5, 30)

retries = 1
while retries <= self.max_retries:
self.log.info('AWS Batch job (%s) status check (%d of %d) in the next %.2f seconds',
self.jobId, retries, self.max_retries, pause)
sleep(pause)

response = self.client.describe_jobs(jobs=[self.jobId])
status = response['jobs'][-1]['status']
self.log.info('AWS Batch job (%s) status: %s', self.jobId, status)
if status in ['SUCCEEDED', 'FAILED']:
break

retries += 1
pause = 1 + pow(retries * 0.3, 2)

def _check_success_task(self):
response = self.client.describe_jobs(
Expand Down

0 comments on commit a198969

Please sign in to comment.