Skip to content

Commit

Permalink
[AIRFLOW-5834] Option to skip serve_logs process with workers (#6709)
Browse files Browse the repository at this point in the history
(cherry-picked from fac3653)
  • Loading branch information
msumit authored and ashb committed Dec 19, 2019
1 parent 8d78de8 commit 2383cd9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
25 changes: 21 additions & 4 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,14 @@ def serve_logs(filename): # noqa
flask_app.run(host='0.0.0.0', port=worker_log_server_port)


def _serve_logs(env, skip_serve_logs=False):
"""Starts serve_logs sub-process"""
if skip_serve_logs is False:
sub_proc = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
return sub_proc
return None


@cli_utils.action_logging
def worker(args):
env = os.environ.copy()
Expand All @@ -1073,8 +1081,11 @@ def worker(args):
from celery.bin import worker

autoscale = args.autoscale
skip_serve_logs = args.skip_serve_logs

if autoscale is None and conf.has_option("celery", "worker_autoscale"):
autoscale = conf.get("celery", "worker_autoscale")

worker = worker.worker(app=celery_app)
options = {
'optimization': 'fair',
Expand Down Expand Up @@ -1106,19 +1117,20 @@ def worker(args):
stderr=stderr,
)
with ctx:
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
sp = _serve_logs(env, skip_serve_logs)
worker.run(**options)
sp.kill()

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)

sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
sp = _serve_logs(env, skip_serve_logs)

worker.run(**options)

if sp:
sp.kill()


Expand Down Expand Up @@ -2024,6 +2036,11 @@ class CLIFactory(object):
'autoscale': Arg(
('-a', '--autoscale'),
help="Minimum and Maximum number of worker to autoscale"),
'skip_serve_logs': Arg(
("-s", "--skip_serve_logs"),
default=False,
help="Don't start the serve logs process along with the workers.",
action="store_true"),
}
subparsers = (
{
Expand Down Expand Up @@ -2173,7 +2190,7 @@ class CLIFactory(object):
'func': worker,
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale'),
'pid', 'daemon', 'stdout', 'stderr', 'log_file', 'autoscale', 'skip_serve_logs'),
}, {
'func': flower,
'help': "Start a Celery Flower",
Expand Down
28 changes: 28 additions & 0 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,31 @@ def test_run_naive_taskinstance(self, mock_local_job):
pickle_id=None,
pool=None,
)


class TestWorkerServeLogs(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli.CLIFactory.get_parser()

def test_serve_logs_on_worker_start(self):
with patch('airflow.bin.cli.subprocess.Popen') as mock_popen:
mock_popen.return_value.communicate.return_value = (b'output', b'error')
mock_popen.return_value.returncode = 0
args = self.parser.parse_args(['worker', '-c', '-1'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
cli.worker(args)
mock_popen.assert_called()

def test_skip_serve_logs_on_worker_start(self):
with patch('airflow.bin.cli.subprocess.Popen') as mock_popen:
mock_popen.return_value.communicate.return_value = (b'output', b'error')
mock_popen.return_value.returncode = 0
args = self.parser.parse_args(['worker', '-c', '-1', '-s'])

with patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
cli.worker(args)
mock_popen.assert_not_called()

0 comments on commit 2383cd9

Please sign in to comment.