Skip to content

Commit

Permalink
[AIRFLOW-5834] Option to skip serve_logs process with workers
Browse files Browse the repository at this point in the history
  • Loading branch information
msumit committed Dec 3, 2019
1 parent 9fed459 commit 42e45f9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
7 changes: 6 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@ class CLIFactory:
'autoscale': Arg(
('-a', '--autoscale'),
help="Minimum and Maximum number of worker to autoscale"),
'skip_serve_logs': Arg(
("-s", "--skip_serve_logs"),
default=False,
help="Don't start the serve logs process along with the workers.",
action="store_true"),
}
subparsers = (
{
Expand Down Expand Up @@ -871,7 +876,7 @@ class CLIFactory:
'func': lazy_load_command('airflow.cli.commands.worker_command.worker'),
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale'),
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale', 'skip_serve_logs'),
}, {
'name': 'flower',
'func': lazy_load_command('airflow.cli.commands.flower_command.flower'),
Expand Down
19 changes: 15 additions & 4 deletions airflow/cli/commands/worker_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler


def _serve_logs(env, skip_serve_logs=False):
"""Starts serve_logs sub-process"""
if skip_serve_logs is False:
sub_proc = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
return sub_proc
return None


@cli_utils.action_logging
def worker(args):
"""Starts Airflow Celery worker"""
Expand All @@ -43,8 +51,11 @@ def worker(args):
from celery.bin import worker # pylint: disable=redefined-outer-name

autoscale = args.autoscale
skip_serve_logs = args.skip_serve_logs

if autoscale is None and conf.has_option("celery", "worker_autoscale"):
autoscale = conf.get("celery", "worker_autoscale")

worker = worker.worker(app=celery_app) # pylint: disable=redefined-outer-name
options = {
'optimization': 'fair',
Expand Down Expand Up @@ -76,17 +87,17 @@ def worker(args):
stderr=stderr,
)
with ctx:
sub_proc = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
sub_proc = _serve_logs(env, skip_serve_logs)
worker.run(**options)
sub_proc.kill()

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)

sub_proc = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)

sub_proc = _serve_logs(env, skip_serve_logs)
worker.run(**options)

if sub_proc:
sub_proc.kill()
29 changes: 29 additions & 0 deletions tests/cli/commands/test_worker_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sqlalchemy

import airflow
from airflow.bin import cli
from airflow.cli.commands import worker_command
from tests.compat import mock, patch
from tests.test_utils.config import conf_vars
Expand Down Expand Up @@ -59,3 +60,31 @@ def test_validate_session_dbapi_exception(self, mock_session):
"""
mock_session.side_effect = sqlalchemy.exc.OperationalError("m1", "m2", "m3", "m4")
self.assertEqual(airflow.settings.validate_session(), False)


class TestWorkerServeLogs(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli.CLIFactory.get_parser()

def test_serve_logs_on_worker_start(self):
with patch('airflow.cli.commands.worker_command.subprocess.Popen') as mock_popen:
mock_popen.return_value.communicate.return_value = (b'output', b'error')
mock_popen.return_value.returncode = 0
args = self.parser.parse_args(['worker', '-c', '-1'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
worker_command.worker(args)
mock_popen.assert_called()

def test_skip_serve_logs_on_worker_start(self):
with patch('airflow.cli.commands.worker_command.subprocess.Popen') as mock_popen:
mock_popen.return_value.communicate.return_value = (b'output', b'error')
mock_popen.return_value.returncode = 0
args = self.parser.parse_args(['worker', '-c', '-1', '-s'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
worker_command.worker(args)
mock_popen.assert_not_called()

0 comments on commit 42e45f9

Please sign in to comment.