Skip to content

Commit

Permalink
[AIRFLOW-6885] Change delete-on-success to delete-on-failure (#8312)
Browse files Browse the repository at this point in the history
* Change delete-on-success to delete-on-failure

It makes more sense to by-default not delete failed pods
Users should explicitly state they want these pods deleted.

This feature has not yet been released so ethis will not be a breaking
change

* deps

* deps

Co-authored-by: Daniel Imberman <[email protected]>
  • Loading branch information
dimberman and astro-sql-decorator authored Apr 15, 2020
1 parent a266497 commit 0f5f172
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 33 deletions.
7 changes: 4 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1738,13 +1738,14 @@
type: string
example: ~
default: "True"
- name: delete_worker_pods_on_success
- name: delete_worker_pods_on_failure
description: |
If True (default), worker pods will be deleted only on task success
If False (and delete_worker_pods is True),
failed worker pods will not be deleted so users can investigate them.
version_added: ~
type: string
example: ~
default: "True"
default: "False"
- name: worker_pods_creation_batch_size
description: |
Number of Kubernetes Worker Pod creation calls per scheduler loop.
Expand Down
5 changes: 3 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,9 @@ worker_container_image_pull_policy = IfNotPresent
# If True, all worker pods will be deleted upon termination
delete_worker_pods = True

# If True (default), worker pods will be deleted only on task success
delete_worker_pods_on_success = True
# If False (and delete_worker_pods is True),
# failed worker pods will not be deleted so users can investigate them.
delete_worker_pods_on_failure = False

# Number of Kubernetes Worker Pod creation calls per scheduler loop.
# Note that the current default of "1" will only launch a single pod
Expand Down
15 changes: 6 additions & 9 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def __init__(self): # pylint: disable=too-many-statements
self.kube_labels = configuration_dict.get('kubernetes_labels', {})
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')
self.delete_worker_pods_on_success = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods_on_success')
self.delete_worker_pods_on_failure = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods_on_failure')
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, 'worker_pods_creation_batch_size')
self.worker_service_account_name = conf.get(
Expand Down Expand Up @@ -895,15 +895,12 @@ def _change_state(self,
pod_id: str,
namespace: str) -> None:
if state != State.RUNNING:
if self.kube_config.delete_worker_pods_on_success and state is State.SUCCESS:
if self.kube_config.delete_worker_pods:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.delete_pod(pod_id, namespace)
elif self.kube_config.delete_worker_pods:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.delete_pod(pod_id, namespace)
self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace))
if state is not State.FAILED or self.kube_config.delete_worker_pods_on_failure:
self.kube_scheduler.delete_pod(pod_id, namespace)
self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace))
try:
self.running.remove(key)
except KeyError:
Expand Down
16 changes: 8 additions & 8 deletions requirements/requirements-python3.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,21 @@ future-fstrings==1.2.0
future==0.18.2
gcsfs==0.6.1
google-ads==4.0.0
google-api-core==1.16.0
google-api-core==1.17.0
google-api-python-client==1.8.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-auth==1.13.1
google-auth==1.14.0
google-cloud-automl==0.10.0
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery==1.24.0
google-cloud-bigtable==1.2.1
google-cloud-container==0.4.0
google-cloud-container==0.5.0
google-cloud-core==1.3.0
google-cloud-datacatalog==0.7.0
google-cloud-dataproc==0.7.0
google-cloud-dlp==0.13.0
google-cloud-kms==1.3.0
google-cloud-kms==1.4.0
google-cloud-language==1.3.0
google-cloud-logging==1.15.0
google-cloud-monitoring==0.34.0
Expand Down Expand Up @@ -178,7 +178,7 @@ iso8601==0.1.12
isodate==0.6.0
isort==4.3.21
itsdangerous==1.1.0
jedi==0.16.0
jedi==0.17.0
jira==2.0.0
jmespath==0.9.5
json-merge-patch==0.2
Expand Down Expand Up @@ -224,7 +224,7 @@ packaging==20.3
pandas-gbq==0.13.1
pandas==1.0.3
papermill==2.1.0
parameterized==0.7.3
parameterized==0.7.4
paramiko==2.7.1
parso==0.7.0
pathspec==0.8.0
Expand Down Expand Up @@ -279,7 +279,7 @@ pytz==2019.3
pytzdata==2019.3
pywinrm==0.4.1
pyzmq==19.0.0
qds-sdk==1.15.0
qds-sdk==1.15.1
redis==3.4.1
regex==2020.4.4
requests-kerberos==0.12.0
Expand Down Expand Up @@ -351,7 +351,7 @@ wcwidth==0.1.9
websocket-client==0.57.0
wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.22.0
yamllint==1.22.1
yandexcloud==0.32.0
zdesk==2.7.1
zict==2.0.0
Expand Down
16 changes: 8 additions & 8 deletions requirements/requirements-python3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,21 @@ future-fstrings==1.2.0
future==0.18.2
gcsfs==0.6.1
google-ads==5.0.2
google-api-core==1.16.0
google-api-core==1.17.0
google-api-python-client==1.8.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-auth==1.13.1
google-auth==1.14.0
google-cloud-automl==0.10.0
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery==1.24.0
google-cloud-bigtable==1.2.1
google-cloud-container==0.4.0
google-cloud-container==0.5.0
google-cloud-core==1.3.0
google-cloud-datacatalog==0.7.0
google-cloud-dataproc==0.7.0
google-cloud-dlp==0.13.0
google-cloud-kms==1.3.0
google-cloud-kms==1.4.0
google-cloud-language==1.3.0
google-cloud-logging==1.15.0
google-cloud-monitoring==0.34.0
Expand Down Expand Up @@ -177,7 +177,7 @@ iso8601==0.1.12
isodate==0.6.0
isort==4.3.21
itsdangerous==1.1.0
jedi==0.16.0
jedi==0.17.0
jira==2.0.0
jmespath==0.9.5
json-merge-patch==0.2
Expand Down Expand Up @@ -223,7 +223,7 @@ packaging==20.3
pandas-gbq==0.13.1
pandas==1.0.3
papermill==2.1.0
parameterized==0.7.3
parameterized==0.7.4
paramiko==2.7.1
parso==0.7.0
pathspec==0.8.0
Expand Down Expand Up @@ -277,7 +277,7 @@ pytz==2019.3
pytzdata==2019.3
pywinrm==0.4.1
pyzmq==19.0.0
qds-sdk==1.15.0
qds-sdk==1.15.1
redis==3.4.1
regex==2020.4.4
requests-kerberos==0.12.0
Expand Down Expand Up @@ -348,7 +348,7 @@ wcwidth==0.1.9
websocket-client==0.57.0
wrapt==1.12.1
xmltodict==0.12.0
yamllint==1.22.0
yamllint==1.22.1
yandexcloud==0.32.0
zdesk==2.7.1
zict==2.0.0
Expand Down
6 changes: 3 additions & 3 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def test_change_state_failed_no_deletion(
):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = True
executor.kube_config.delete_worker_pods_on_failure = False
executor.start()
test_time = timezone.utcnow()
key = ('dag_id', 'task_id', test_time, 'try_number3')
Expand All @@ -281,7 +281,7 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli
test_time = timezone.utcnow()
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = False
executor.kube_config.delete_worker_pods_on_failure = False

executor.start()
key = ('dag_id', 'task_id', test_time, 'try_number2')
Expand All @@ -295,7 +295,7 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods_on_success = True
executor.kube_config.delete_worker_pods_on_failure = True

executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
Expand Down

0 comments on commit 0f5f172

Please sign in to comment.