diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 41f0df3e212ef0..98464b74d8ebaa 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -270,23 +270,16 @@ def execute(self, context): pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector) - if len(pod_list.items) > 1: + if len(pod_list.items) > 1 and self.reattach_on_restart: raise AirflowException( 'More than one pod running with labels: ' '{label_selector}'.format(label_selector=label_selector)) launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push) - if len(pod_list.items) == 1 and \ - self._try_numbers_do_not_match(context, pod_list.items[0]) and \ - self.reattach_on_restart: - self.log.info("found a running pod with labels %s but a different try_number" - "Will attach to this pod and monitor instead of starting new one", labels) - final_state, _, result = self.create_new_pod_for_operator(labels, launcher) - elif len(pod_list.items) == 1: - self.log.info("found a running pod with labels %s." - "Will monitor this pod instead of starting new one", labels) - final_state, result = self.monitor_launched_pod(launcher, pod_list[0]) + if len(pod_list.items) == 1: + try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) + final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list) else: final_state, _, result = self.create_new_pod_for_operator(labels, launcher) if final_state != State.SUCCESS: @@ -296,14 +289,41 @@ def execute(self, context): except AirflowException as ex: raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list): + """ + In cases where the Scheduler restarts while a KubernetsPodOperator task is running, + this function will either continue to monitor the existing pod or launch a new pod + based on the `reattach_on_restart` parameter. + :param labels: labels used to determine if a pod is repeated + :type labels: dict + :param try_numbers_match: do the try numbers match? Only needed for logging purposes + :type try_numbers_match: bool + :param launcher: PodLauncher + :param pod_list: list of pods found + """ + if try_numbers_match: + log_line = "found a running pod with labels {} and the same try_number.".format(labels) + else: + log_line = "found a running pod with labels {} but a different try_number.".format(labels) + + if self.reattach_on_restart: + log_line = log_line + " Will attach to this pod and monitor instead of starting new one" + self.log.info(log_line) + final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0]) + else: + log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher) + self.log.info(log_line) + final_state, _, result = self.create_new_pod_for_operator(labels, launcher) + return final_state, result + @staticmethod def _get_pod_identifying_label_string(labels): filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'} return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())]) @staticmethod - def _try_numbers_do_not_match(context, pod): - return pod.metadata.labels['try_number'] != context['ti'].try_number + def _try_numbers_match(context, pod): + return pod.metadata.labels['try_number'] == context['ti'].try_number @staticmethod def _set_resources(resources):