diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 05b78b190b388f..f50fda963336f1 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1738,13 +1738,14 @@ type: string example: ~ default: "True" - - name: delete_worker_pods_on_success + - name: delete_worker_pods_on_failure description: | - If True (default), worker pods will be deleted only on task success + If False (and delete_worker_pods is True), + failed worker pods will not be deleted so users can investigate them. version_added: ~ type: string example: ~ - default: "True" + default: "False" - name: worker_pods_creation_batch_size description: | Number of Kubernetes Worker Pod creation calls per scheduler loop. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 4020c97f0399b7..bee8547c06046f 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -810,8 +810,9 @@ worker_container_image_pull_policy = IfNotPresent # If True, all worker pods will be deleted upon termination delete_worker_pods = True -# If True (default), worker pods will be deleted only on task success -delete_worker_pods_on_success = True +# If False (and delete_worker_pods is True), +# failed worker pods will not be deleted so users can investigate them. +delete_worker_pods_on_failure = False # Number of Kubernetes Worker Pod creation calls per scheduler loop. # Note that the current default of "1" will only launch a single pod diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index d41a8797888d5d..5d9ebb20359364 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -104,8 +104,8 @@ def __init__(self): # pylint: disable=too-many-statements self.kube_labels = configuration_dict.get('kubernetes_labels', {}) self.delete_worker_pods = conf.getboolean( self.kubernetes_section, 'delete_worker_pods') - self.delete_worker_pods_on_success = conf.getboolean( - self.kubernetes_section, 'delete_worker_pods_on_success') + self.delete_worker_pods_on_failure = conf.getboolean( + self.kubernetes_section, 'delete_worker_pods_on_failure') self.worker_pods_creation_batch_size = conf.getint( self.kubernetes_section, 'worker_pods_creation_batch_size') self.worker_service_account_name = conf.get( @@ -895,15 +895,12 @@ def _change_state(self, pod_id: str, namespace: str) -> None: if state != State.RUNNING: - if self.kube_config.delete_worker_pods_on_success and state is State.SUCCESS: + if self.kube_config.delete_worker_pods: if not self.kube_scheduler: raise AirflowException(NOT_STARTED_MESSAGE) - self.kube_scheduler.delete_pod(pod_id, namespace) - elif self.kube_config.delete_worker_pods: - if not self.kube_scheduler: - raise AirflowException(NOT_STARTED_MESSAGE) - self.kube_scheduler.delete_pod(pod_id, namespace) - self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace)) + if state is not State.FAILED or self.kube_config.delete_worker_pods_on_failure: + self.kube_scheduler.delete_pod(pod_id, namespace) + self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace)) try: self.running.remove(key) except KeyError: diff --git a/requirements/requirements-python3.6.txt b/requirements/requirements-python3.6.txt index 80b604fd7b4658..564726351be20a 100644 --- a/requirements/requirements-python3.6.txt +++ b/requirements/requirements-python3.6.txt @@ -123,21 +123,21 @@ future-fstrings==1.2.0 future==0.18.2 gcsfs==0.6.1 google-ads==4.0.0 -google-api-core==1.16.0 +google-api-core==1.17.0 google-api-python-client==1.8.0 google-auth-httplib2==0.0.3 google-auth-oauthlib==0.4.1 -google-auth==1.13.1 +google-auth==1.14.0 google-cloud-automl==0.10.0 google-cloud-bigquery-datatransfer==1.0.0 google-cloud-bigquery==1.24.0 google-cloud-bigtable==1.2.1 -google-cloud-container==0.4.0 +google-cloud-container==0.5.0 google-cloud-core==1.3.0 google-cloud-datacatalog==0.7.0 google-cloud-dataproc==0.7.0 google-cloud-dlp==0.13.0 -google-cloud-kms==1.3.0 +google-cloud-kms==1.4.0 google-cloud-language==1.3.0 google-cloud-logging==1.15.0 google-cloud-monitoring==0.34.0 @@ -178,7 +178,7 @@ iso8601==0.1.12 isodate==0.6.0 isort==4.3.21 itsdangerous==1.1.0 -jedi==0.16.0 +jedi==0.17.0 jira==2.0.0 jmespath==0.9.5 json-merge-patch==0.2 @@ -224,7 +224,7 @@ packaging==20.3 pandas-gbq==0.13.1 pandas==1.0.3 papermill==2.1.0 -parameterized==0.7.3 +parameterized==0.7.4 paramiko==2.7.1 parso==0.7.0 pathspec==0.8.0 @@ -279,7 +279,7 @@ pytz==2019.3 pytzdata==2019.3 pywinrm==0.4.1 pyzmq==19.0.0 -qds-sdk==1.15.0 +qds-sdk==1.15.1 redis==3.4.1 regex==2020.4.4 requests-kerberos==0.12.0 @@ -351,7 +351,7 @@ wcwidth==0.1.9 websocket-client==0.57.0 wrapt==1.12.1 xmltodict==0.12.0 -yamllint==1.22.0 +yamllint==1.22.1 yandexcloud==0.32.0 zdesk==2.7.1 zict==2.0.0 diff --git a/requirements/requirements-python3.7.txt b/requirements/requirements-python3.7.txt index e2ad1e5e1ca9c8..e88f2c020e92c5 100644 --- a/requirements/requirements-python3.7.txt +++ b/requirements/requirements-python3.7.txt @@ -123,21 +123,21 @@ future-fstrings==1.2.0 future==0.18.2 gcsfs==0.6.1 google-ads==5.0.2 -google-api-core==1.16.0 +google-api-core==1.17.0 google-api-python-client==1.8.0 google-auth-httplib2==0.0.3 google-auth-oauthlib==0.4.1 -google-auth==1.13.1 +google-auth==1.14.0 google-cloud-automl==0.10.0 google-cloud-bigquery-datatransfer==1.0.0 google-cloud-bigquery==1.24.0 google-cloud-bigtable==1.2.1 -google-cloud-container==0.4.0 +google-cloud-container==0.5.0 google-cloud-core==1.3.0 google-cloud-datacatalog==0.7.0 google-cloud-dataproc==0.7.0 google-cloud-dlp==0.13.0 -google-cloud-kms==1.3.0 +google-cloud-kms==1.4.0 google-cloud-language==1.3.0 google-cloud-logging==1.15.0 google-cloud-monitoring==0.34.0 @@ -177,7 +177,7 @@ iso8601==0.1.12 isodate==0.6.0 isort==4.3.21 itsdangerous==1.1.0 -jedi==0.16.0 +jedi==0.17.0 jira==2.0.0 jmespath==0.9.5 json-merge-patch==0.2 @@ -223,7 +223,7 @@ packaging==20.3 pandas-gbq==0.13.1 pandas==1.0.3 papermill==2.1.0 -parameterized==0.7.3 +parameterized==0.7.4 paramiko==2.7.1 parso==0.7.0 pathspec==0.8.0 @@ -277,7 +277,7 @@ pytz==2019.3 pytzdata==2019.3 pywinrm==0.4.1 pyzmq==19.0.0 -qds-sdk==1.15.0 +qds-sdk==1.15.1 redis==3.4.1 regex==2020.4.4 requests-kerberos==0.12.0 @@ -348,7 +348,7 @@ wcwidth==0.1.9 websocket-client==0.57.0 wrapt==1.12.1 xmltodict==0.12.0 -yamllint==1.22.0 +yamllint==1.22.1 yandexcloud==0.32.0 zdesk==2.7.1 zict==2.0.0 diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index f2d3f1b98f7454..7a89d5758cef8a 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -264,7 +264,7 @@ def test_change_state_failed_no_deletion( ): executor = KubernetesExecutor() executor.kube_config.delete_worker_pods = False - executor.kube_config.delete_worker_pods_on_success = True + executor.kube_config.delete_worker_pods_on_failure = False executor.start() test_time = timezone.utcnow() key = ('dag_id', 'task_id', test_time, 'try_number3') @@ -281,7 +281,7 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli test_time = timezone.utcnow() executor = KubernetesExecutor() executor.kube_config.delete_worker_pods = False - executor.kube_config.delete_worker_pods_on_success = False + executor.kube_config.delete_worker_pods_on_failure = False executor.start() key = ('dag_id', 'task_id', test_time, 'try_number2') @@ -295,7 +295,7 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher): executor = KubernetesExecutor() - executor.kube_config.delete_worker_pods_on_success = True + executor.kube_config.delete_worker_pods_on_failure = True executor.start() key = ('dag_id', 'task_id', 'ex_time', 'try_number2')