Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIRFLOW-5581: Join KubernetesJobWatcher in terminate call and unblock queues from blocking forever #6237

Merged

Conversation

kpathak13
Copy link
Contributor

@kpathak13 kpathak13 commented Oct 2, 2019

Make sure you have checked all steps below.

Jira

Description

  • It fixes a bug in kubernetes_executor module

If run_duration or num_runs is enabled, KubernetesJobWatcher dies prematurely causing below error:

[2019-10-01 19:40:35,323] {kubernetes_executor.py:327} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 325, in run
self.worker_uuid, self.kube_config)
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 359, in _run
task.metadata.resource_version
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 391, in process_status
self.watcher_queue.put((pod_id, None, labels, resource_version))
File "", line 2, in put
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 325, in run
self.worker_uuid, self.kube_config)
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 359, in _run
task.metadata.resource_version
File "/home/vccorp/.local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 391, in process_status
self.watcher_queue.put((pod_id, None, labels, resource_version))
File "", line 2, in put
File "/usr/lib64/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib64/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

Tests

  • This does not have test case since it's a single line update.

@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch from 118dee2 to 577ffbe Compare October 2, 2019 15:22
@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch 2 times, most recently from 2f9afae to 2bc46e3 Compare October 2, 2019 15:39
@mik-laj mik-laj added the k8s label Oct 3, 2019
@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch from 2bc46e3 to b04ceac Compare October 4, 2019 09:11
@kpathak13 kpathak13 changed the title AIRFLOW-5581: Join KubernetesJobWatcher in terminate call AIRFLOW-5581: Join KubernetesJobWatcher in terminate call and unblock queues from blocking forever Oct 4, 2019
@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch 2 times, most recently from 4e9d69b to 1ab4c59 Compare October 4, 2019 10:03
@codecov-io
Copy link

codecov-io commented Oct 4, 2019

Codecov Report

Merging #6237 into master will decrease coverage by 0.15%.
The diff coverage is 6.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #6237      +/-   ##
==========================================
- Coverage   80.04%   79.89%   -0.16%     
==========================================
  Files         610      610              
  Lines       35264    35306      +42     
==========================================
- Hits        28227    28207      -20     
- Misses       7037     7099      +62
Impacted Files Coverage Δ
airflow/executors/kubernetes_executor.py 58.89% <6.66%> (-6.34%) ⬇️
airflow/operators/postgres_operator.py 0% <0%> (-100%) ⬇️
airflow/hooks/postgres_hook.py 94.73% <0%> (-1.76%) ⬇️
airflow/utils/sqlalchemy.py 91.52% <0%> (-1.7%) ⬇️
airflow/hooks/dbapi_hook.py 86.44% <0%> (-1.7%) ⬇️
airflow/models/baseoperator.py 95.27% <0%> (ø) ⬆️
airflow/bin/cli.py 66.38% <0%> (+0.2%) ⬆️
airflow/models/taskinstance.py 93.75% <0%> (+0.5%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 18c62da...55ba865. Read the comment docs.

@kpathak13 kpathak13 requested a review from ashb October 4, 2019 14:05
@dimberman
Copy link
Contributor

@kpathak13 LGTM. Can you rebase? I'll merge when ready :)

@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch from 1ab4c59 to bf23c36 Compare October 6, 2019 10:37
self.log.debug("Terminating kube_watcher...")
self.kube_watcher.terminate()
self.kube_watcher.join()
self.log.debug("kube_watcher=%s alive?=%s", self.kube_watcher, self.kube_watcher.is_alive())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't useful - after a call to .join() the kube_watcher should never be alive, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed is alive? and rebased.

@kpathak13 kpathak13 force-pushed the feature/AIRFLOW-5581-k8jobwatcher-bug-fix branch from bf23c36 to 55ba865 Compare October 7, 2019 10:16
@kpathak13 kpathak13 requested a review from ashb October 8, 2019 16:00
@ashb
Copy link
Member

ashb commented Oct 10, 2019

The code looks okay, but the important question: how have you tested this?

@kpathak13
Copy link
Contributor Author

@ashb , I tested this on my k8 cluster:

1. Scheduled multiple high frequency jobs 
2. Set Scheduler run duration to 2 minutes
3. Killed several pods in flight at different stages (multiple times)
4. Deleted Scheduler pod

Never observed race condition.

@ashb ashb merged commit 12f916a into apache:master Oct 11, 2019
ashb pushed a commit that referenced this pull request Oct 11, 2019
…er shutdown on SIGTERM (#6237)

(cherry picked from commit 12f916a)
ashb pushed a commit that referenced this pull request Oct 11, 2019
…er shutdown on SIGTERM (#6237)

(cherry picked from commit 12f916a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants