Skip to content

Commit

Permalink
KubernetesJobWatcher no longer inherits from Process (#11017)
Browse files Browse the repository at this point in the history
multiprocessing.Process is set up in a very unfortunate manner
that pretty much makes it impossible to test a class that inherits from
Process or use any of its internal functions. For this reason we decided
to seperate the actual process based functionality into a class member
  • Loading branch information
dimberman authored Sep 18, 2020
1 parent 1a46e9b commit 1539bd0
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
27 changes: 26 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _get_security_context_val(self, scontext: str) -> Union[str, int]:
return int(val)


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
class KubernetesJobWatcher(LoggingMixin):
"""Watches for Kubernetes jobs"""

def __init__(self,
Expand All @@ -142,6 +142,31 @@ def __init__(self,
self.watcher_queue = watcher_queue
self.resource_version = resource_version
self.kube_config = kube_config
self.watcher_process = multiprocessing.Process(target=self.run, args=())

def start(self):
"""
Start the watcher process
"""
self.watcher_process.start()

def is_alive(self):
"""
Check if the watcher process is alive
"""
self.watcher_process.is_alive()

def join(self):
"""
Join watcher process
"""
self.watcher_process.join()

def terminate(self):
"""
Terminate watcher process
"""
self.watcher_process.terminate()

def run(self) -> None:
"""Performs watching"""
Expand Down
55 changes: 54 additions & 1 deletion tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
import multiprocessing
import random
import re
import string
Expand All @@ -30,14 +31,66 @@
try:
from kubernetes.client.rest import ApiException

from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubernetesExecutor
from airflow.executors.kubernetes_executor import (
AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher,
)
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.utils.state import State
except ImportError:
AirflowKubernetesScheduler = None # type: ignore


class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self) -> None:
self.watcher_queue = multiprocessing.Manager().Queue()
self.watcher = KubernetesJobWatcher(
namespace="namespace",
multi_namespace_mode=False,
watcher_queue=self.watcher_queue,
resource_version="0",
worker_uuid="0",
kube_config=None,
)

def test_running_task(self):
self.watcher.process_status(
pod_id="pod_id",
namespace="namespace",
status="Running",
annotations={"foo": "bar"},
resource_version="5",
event={"type": "ADDED"}
)
self.assertTrue(self.watcher_queue.empty())

def test_succeeded_task(self):
self.watcher.process_status(
pod_id="pod_id",
namespace="namespace",
status="Succeeded",
annotations={"foo": "bar"},
resource_version="5",
event={"type": "ADDED"}
)
result = self.watcher_queue.get_nowait()
self.assertEqual(('pod_id', 'namespace', None, {'foo': 'bar'}, '5'), result)
self.assertTrue(self.watcher_queue.empty())

def test_failed_task(self):
self.watcher.process_status(
pod_id="pod_id",
namespace="namespace",
status="Failed",
annotations={"foo": "bar"},
resource_version="5",
event={"type": "ADDED"}
)
result = self.watcher_queue.get_nowait()
self.assertEqual(('pod_id', 'namespace', "failed", {'foo': 'bar'}, '5'), result)
self.assertTrue(self.watcher_queue.empty())


# pylint: disable=unused-argument
class TestAirflowKubernetesScheduler(unittest.TestCase):
@staticmethod
Expand Down

0 comments on commit 1539bd0

Please sign in to comment.