Skip to content

Commit

Permalink
Fix KubernetesPodOperator reattachment (apache#10230)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimberman authored Aug 11, 2020
1 parent a9f7222 commit 8cd2be9
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,16 @@ def execute(self, context) -> Optional[str]:

pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)

if len(pod_list.items) > 1:
if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
'More than one pod running with labels: '
'{label_selector}'.format(label_selector=label_selector))

launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)

if len(pod_list.items) == 1 and \
self._try_numbers_do_not_match(context, pod_list.items[0]) and \
self.reattach_on_restart:
self.log.info("found a running pod with labels %s but a different try_number"
"Will attach to this pod and monitor instead of starting new one", labels)
final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
elif len(pod_list.items) == 1:
self.log.info("found a running pod with labels %s."
"Will monitor this pod instead of starting new one", labels)
final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list)
else:
self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
Expand All @@ -304,14 +297,43 @@ def execute(self, context) -> Optional[str]:
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list):
"""
In cases where the Scheduler restarts while a KubernetsPodOperator task is running,
this function will either continue to monitor the existing pod or launch a new pod
based on the `reattach_on_restart` parameter.
:param labels: labels used to determine if a pod is repeated
:type labels: dict
:param try_numbers_match: do the try numbers match? Only needed for logging purposes
:type try_numbers_match: bool
:param launcher: PodLauncher
:param pod_list: list of pods found
"""
if try_numbers_match:
log_line = "found a running pod with labels {} and the same try_number.".format(labels)
else:
log_line = "found a running pod with labels {} but a different try_number.".format(labels)

if self.reattach_on_restart:
log_line = log_line + " Will attach to this pod and monitor instead of starting new one"
self.log.info(log_line)
final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0])
else:
log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher)
self.log.info(log_line)
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
return final_state, result

@staticmethod
def _get_pod_identifying_label_string(labels):
filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())])

@staticmethod
def _try_numbers_do_not_match(context, pod):
return pod.metadata.labels['try_number'] != context['ti'].try_number
def _try_numbers_match(context, pod):
return pod.metadata.labels['try_number'] == context['ti'].try_number

@staticmethod
def _set_resources(resources):
Expand Down

0 comments on commit 8cd2be9

Please sign in to comment.