Skip to content

Commit

Permalink
Fix Task Adoption in KubernetesExecutor (#14795)
Browse files Browse the repository at this point in the history
Ensure that we use ti.queued_by_job_id when searching for pods. The queued_by_job_id is used by
adopt_launched_task when updating the labels. Without this, after restarting the scheduler
a third time, the scheduler does not find the pods as it is still searching for the id of
the original scheduler (ti.external_executor_id)

Co-Authored-By: samwedge <[email protected]>
Co-Authored-By: philip-hope <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
(cherry picked from commit 344e829)
  • Loading branch information
atrbgithub authored and potiuk committed May 9, 2021
1 parent aa58a0a commit 8e00bc9
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,15 +586,15 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
self.event_buffer[key] = state, None

def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
tis_to_flush = [ti for ti in tis if not ti.external_executor_id]
scheduler_job_ids = [ti.external_executor_id for ti in tis]
tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
pod_ids = {
create_pod_id(
dag_id=pod_generator.make_safe_label_value(ti.dag_id),
task_id=pod_generator.make_safe_label_value(ti.task_id),
): ti
for ti in tis
if ti.external_executor_id
if ti.queued_by_job_id
}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
Expand Down
76 changes: 76 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,82 @@ def test_change_state_failed_pod_deletion(
assert executor.event_buffer[key][0] == State.FAILED
mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')

@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task):
executor = self.kubernetes_executor
executor.scheduler_job_id = "10"
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", labels={"dag_id": "dag", "task_id": "task"}))
pod_id = create_pod_id(dag_id="dag", task_id="task")
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value.items = [pod]
executor.kube_client = mock_kube_client

# First adoption
executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace='default', label_selector='airflow-worker=1'
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
mock_adopt_completed_pods.assert_called_once()
# We aren't checking the return value of `try_adopt_task_instances` because it relies on
# `adopt_launched_task` mutating its arg. This should be refactored, but not right now.

# Second adoption (queued_by_job_id and external_executor_id no longer match)
mock_kube_client.reset_mock()
mock_adopt_launched_task.reset_mock()
mock_adopt_completed_pods.reset_mock()

mock_ti.queued_by_job_id = "10" # scheduler_job would have updated this after the first adoption
executor.scheduler_job_id = "20"

executor.try_adopt_task_instances([mock_ti])
mock_kube_client.list_namespaced_pod.assert_called_once_with(
namespace='default', label_selector='airflow-worker=10'
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
mock_adopt_completed_pods.assert_called_once()

@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods):
"""We try to find pods only once per scheduler id"""
executor = self.kubernetes_executor
mock_kube_client = mock.MagicMock()
executor.kube_client = mock_kube_client

mock_tis = [
mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"),
mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task2"),
mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task3"),
]

executor.try_adopt_task_instances(mock_tis)
assert mock_kube_client.list_namespaced_pod.call_count == 2
mock_kube_client.list_namespaced_pod.assert_has_calls(
[
mock.call(namespace='default', label_selector='airflow-worker=10'),
mock.call(namespace='default', label_selector='airflow-worker=40'),
],
any_order=True,
)

@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
def test_try_adopt_task_instances_no_matching_pods(
self, mock_adopt_completed_pods, mock_adopt_launched_task
):
executor = self.kubernetes_executor
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value.items = []
executor.kube_client = mock_kube_client

tis_to_flush = executor.try_adopt_task_instances([mock_ti])
assert tis_to_flush == [mock_ti]
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()

@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_adopt_launched_task(self, mock_kube_client):
executor = self.kubernetes_executor
Expand Down

0 comments on commit 8e00bc9

Please sign in to comment.