Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6529] Pickle error occurs when the scheduler tries to run on macOS. #7128

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 54 additions & 22 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#

import logging
import multiprocessing
import multiprocessing as mp
import os
import signal
import sys
Expand All @@ -36,7 +36,7 @@

from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.jobs.base_job import BaseJob
Expand Down Expand Up @@ -101,7 +101,8 @@ def _run_file_processor(result_channel,
pickle_dags,
dag_id_white_list,
thread_name,
zombies):
zombies,
inherited_conf=None):
"""
Process the given file.

Expand All @@ -126,7 +127,14 @@ def _run_file_processor(result_channel,
log = logging.getLogger("airflow.processor")

set_context(log, file_path)
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))

if inherited_conf is not None: # pylint: disable=too-many-nested-blocks
for section in inherited_conf:
for key, value in inherited_conf[section].items():
if value not in conf:
conf.set(section, key, value.replace("%", "%%"))

setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is inherited_conf used -- I can't find this one used anywhere either.

We want to set the process title always.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is just a miss-indentation. I'll fix it.


try:
# redirect stdout/stderr to log
Expand Down Expand Up @@ -168,17 +176,34 @@ def start(self):
"""
Launch the process and start processing the DAG.
"""
self._parent_channel, _child_channel = multiprocessing.Pipe()
self._process = multiprocessing.Process(
if conf.has_option('core', 'mp_start_method'):
mp_start_method = conf.get('core', 'mp_start_method')
else:
mp_start_method = mp.get_start_method()

possible_value_list = mp.get_all_start_methods()
if mp_start_method not in possible_value_list:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible value is one of " + str(possible_value_list))
cxt = mp.get_context(mp_start_method)

self._parent_channel, _child_channel = cxt.Pipe()
args_list = [
_child_channel,
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self._zombies
]

if cxt.get_start_method() != "fork":
args_list.append(conf)

self._process = cxt.Process(
target=type(self)._run_file_processor,
args=(
_child_channel,
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self._zombies
),
args=args_list,
name="DagFileProcessor{}-Process".format(self._instance_id)
)
self._start_time = timezone.utcnow()
Expand Down Expand Up @@ -848,6 +873,19 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
return simple_dags, len(dagbag.import_errors)


# To be picklable for both Linux and macOS,
# this function is needed to be defined at module scope.
# SchedulerJob is not pickleable for Linux for now so
# this function can't be a member of the class.
def processor_factory(file_path, zombies, dag_ids, pickle_dags):
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_id_white_list=dag_ids,
zombies=zombies
)


class SchedulerJob(BaseJob):
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
Expand Down Expand Up @@ -1477,14 +1515,6 @@ def _execute(self):
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

def processor_factory(file_path, zombies):
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_id_white_list=self.dag_ids,
zombies=zombies
)

# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
async_mode = not self.using_sqlite
Expand All @@ -1496,6 +1526,8 @@ def processor_factory(file_path, zombies):
self.num_runs,
processor_factory,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)

try:
Expand Down
75 changes: 60 additions & 15 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import enum
import importlib
import logging
import multiprocessing
import multiprocessing as mp
import os
import signal
import sys
Expand All @@ -37,7 +37,7 @@
import airflow.models
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.models import Connection, errors
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.settings import STORE_SERIALIZED_DAGS
Expand Down Expand Up @@ -292,6 +292,8 @@ def __init__(self,
max_runs,
processor_factory,
processor_timeout,
dag_ids,
pickle_dags,
async_mode):
"""
:param dag_directory: Directory where DAG definitions are kept. All
Expand All @@ -307,6 +309,10 @@ def __init__(self,
:type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessorProcess)
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type: pickle_dags: bool
:param async_mode: Whether to start agent in async mode
:type async_mode: bool
"""
Expand All @@ -316,6 +322,8 @@ def __init__(self,
self._max_runs = max_runs
self._processor_factory = processor_factory
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._pickle_dags = pickle_dags
self._async_mode = async_mode
# Map from file path to the processor
self._processors = {}
Expand All @@ -332,18 +340,37 @@ def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
"""
self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
self._process = multiprocessing.Process(
if conf.has_option('core', 'mp_start_method'):
mp_start_method = conf.get('core', 'mp_start_method')
else:
mp_start_method = mp.get_start_method()

possible_value_list = mp.get_all_start_methods()
if mp_start_method not in possible_value_list:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible value is one of " + str(possible_value_list))
cxt = mp.get_context(mp_start_method)

self._parent_signal_conn, child_signal_conn = cxt.Pipe()
args_list = [
self._dag_directory,
self._file_paths,
self._max_runs,
self._processor_factory,
self._processor_timeout,
child_signal_conn,
self._dag_ids,
self._pickle_dags,
self._async_mode
]

if cxt.get_start_method() != "fork":
args_list.append(conf)

self._process = cxt.Process(
target=type(self)._run_processor_manager,
args=(
self._dag_directory,
self._file_paths,
self._max_runs,
self._processor_factory,
self._processor_timeout,
child_signal_conn,
self._async_mode,
)
args=args_list
)
self._process.start()

Expand Down Expand Up @@ -388,12 +415,20 @@ def _run_processor_manager(dag_directory,
processor_factory,
processor_timeout,
signal_conn,
async_mode):
dag_ids,
pickle_dags,
async_mode,
inherited_conf=None):

# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
os.setpgid(0, 0)
if inherited_conf is not None: # pylint: disable=too-many-nested-blocks
for section in inherited_conf:
for key, value in inherited_conf[section].items():
if value not in conf:
conf.set(section, key, value.replace("%", "%%"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspect. What's going on here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see anywhere that we actually pass a value here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn method doesn't inherit environment except for a little bit of things so conf is not also inherited by child process. I noticed test_reload_module in test_dag_processing.py fails because DAG_PROCESSOR_MANAGER_LOG_LOCATION configured is not inherited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not DAG_PROCESSOR_MANAGER_LOG_LOCATION but logging_config_class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb If only test_reload_module needs to dynamically change the configuration and propagate it to child processes, I'll remove inherited_conf and its related code.
What do you think?


setproctitle("airflow scheduler -- DagFileProcessorManager")
# Reload configurations and settings to avoid collision with parent process.
Expand All @@ -414,6 +449,8 @@ def _run_processor_manager(dag_directory,
processor_factory,
processor_timeout,
signal_conn,
dag_ids,
pickle_dags,
async_mode)

processor_manager.start()
Expand Down Expand Up @@ -527,6 +564,10 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
:type processor_timeout: timedelta
:param signal_conn: connection to communicate signal with processor agent.
:type signal_conn: airflow.models.connection.Connection
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type pickle_dags: bool
:param async_mode: whether to start the manager in async mode
:type async_mode: bool
"""
Expand All @@ -538,13 +579,17 @@ def __init__(self,
processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessorProcess],
processor_timeout: timedelta,
signal_conn: Connection,
dag_ids: List[str],
pickle_dags: bool,
async_mode: bool = True):
self._file_paths = file_paths
self._file_path_queue: List[str] = []
self._dag_directory = dag_directory
self._max_runs = max_runs
self._processor_factory = processor_factory
self._signal_conn = signal_conn
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: Optional[datetime] = None

Expand Down Expand Up @@ -1055,7 +1100,7 @@ def heartbeat(self):
# Start more processors if we have enough slots and files to process
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, self._zombies)
processor = self._processor_factory(file_path, self._zombies, self._dag_ids, self._pickle_dags)
Stats.incr('dag_processing.processes')

processor.start()
Expand Down
25 changes: 25 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,31 @@ def test_scheduler_multiprocessing(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)

def test_scheduler_multiprocessing_with_spawn_method(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
"""
try:
conf.set('core', 'mp_start_method', 'spawn')
dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success']
for dag_id in dag_ids:
dag = self.dagbag.get_dag(dag_id)
dag.clear()

scheduler = SchedulerJob(dag_ids=dag_ids,
executor=self.null_exec,
subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
num_runs=1)
scheduler.run()

# zero tasks ran
dag_id = 'test_start_date_scheduling'
session = settings.Session()
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
finally:
conf.remove_option('core', 'mp_start_method')

@patch.object(TI, 'pool_full')
def test_scheduler_verify_pool_full(self, mock_pool_full):
"""
Expand Down
9 changes: 8 additions & 1 deletion tests/test_utils/mock_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@ def __init__(self, do_update=True, *args, **kwargs):
self.history = []
# All the tasks, in a stable sort order
self.sorted_tasks = []
self.mock_task_results = defaultdict(lambda: State.SUCCESS)

# If multiprocessing runs in spawn mode,
# arguments are to be pickled but lambda is not picclable.
# So we should pass self.success instead of lambda.
ashb marked this conversation as resolved.
Show resolved Hide resolved
self.mock_task_results = defaultdict(self.success)

super().__init__(*args, **kwargs)

def success(self):
return State.SUCCESS
ashb marked this conversation as resolved.
Show resolved Hide resolved

def heartbeat(self):
if not self.do_update:
return
Expand Down
Loading