Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6529] Pickle error occurs when the scheduler tries to run on macOS. #7128

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#

import logging
import multiprocessing
import multiprocessing as mp
import os
import signal
import sys
Expand All @@ -36,7 +36,7 @@

from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.jobs.base_job import BaseJob
Expand Down Expand Up @@ -168,8 +168,20 @@ def start(self):
"""
Launch the process and start processing the DAG.
"""
self._parent_channel, _child_channel = multiprocessing.Pipe()
self._process = multiprocessing.Process(
if conf.has_option('core', 'mp_start_method'):
mp_start_method = conf.get('core', 'mp_start_method')
else:
mp_start_method = mp.get_start_method()

possible_value_list = mp.get_all_start_methods()
if mp_start_method not in possible_value_list:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible value is one of " + str(possible_value_list))
cxt = mp.get_context(mp_start_method)

self._parent_channel, _child_channel = cxt.Pipe()
self._process = cxt.Process(
target=type(self)._run_file_processor,
args=(
_child_channel,
Expand Down Expand Up @@ -848,6 +860,19 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
return simple_dags, len(dagbag.import_errors)


# To be picklable for both Linux and macOS,
# this function is needed to be defined at module scope.
# SchedulerJob is not pickleable for Linux for now so
# this function can't be a member of the class.
def processor_factory(file_path, zombies, dag_ids, pickle_dags):
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_id_white_list=dag_ids,
zombies=zombies
)


class SchedulerJob(BaseJob):
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
Expand Down Expand Up @@ -1477,14 +1502,6 @@ def _execute(self):
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

def processor_factory(file_path, zombies):
return DagFileProcessorProcess(
file_path=file_path,
pickle_dags=pickle_dags,
dag_id_white_list=self.dag_ids,
zombies=zombies
)

# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
async_mode = not self.using_sqlite
Expand All @@ -1496,6 +1513,8 @@ def processor_factory(file_path, zombies):
self.num_runs,
processor_factory,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)

try:
Expand Down
46 changes: 40 additions & 6 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import enum
import importlib
import logging
import multiprocessing
import multiprocessing as mp
import os
import signal
import sys
Expand All @@ -37,7 +37,7 @@
import airflow.models
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.models import Connection, errors
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.settings import STORE_SERIALIZED_DAGS
Expand Down Expand Up @@ -292,6 +292,8 @@ def __init__(self,
max_runs,
processor_factory,
processor_timeout,
dag_ids,
pickle_dags,
async_mode):
"""
:param dag_directory: Directory where DAG definitions are kept. All
Expand All @@ -307,6 +309,10 @@ def __init__(self,
:type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessorProcess)
:param processor_timeout: How long to wait before timing out a DAG file processor
:type processor_timeout: timedelta
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type: pickle_dags: bool
:param async_mode: Whether to start agent in async mode
:type async_mode: bool
"""
Expand All @@ -316,6 +322,8 @@ def __init__(self,
self._max_runs = max_runs
self._processor_factory = processor_factory
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
self._pickle_dags = pickle_dags
self._async_mode = async_mode
# Map from file path to the processor
self._processors = {}
Expand All @@ -332,8 +340,20 @@ def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
"""
self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
self._process = multiprocessing.Process(
if conf.has_option('core', 'mp_start_method'):
mp_start_method = conf.get('core', 'mp_start_method')
else:
mp_start_method = mp.get_start_method()

possible_value_list = mp.get_all_start_methods()
if mp_start_method not in possible_value_list:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible value is one of " + str(possible_value_list))
cxt = mp.get_context(mp_start_method)

self._parent_signal_conn, child_signal_conn = cxt.Pipe()
self._process = cxt.Process(
Comment on lines +343 to +356
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this block of code is identical to https://github.com/apache/airflow/pull/7128/files#diff-c35269bcfbbe386e269ffa7487e86192R171-R184. Given that they would probably need to change at the same time if either ever changed, this might be a good case for a utility mixin. Lines 343-352 could go into a MultiProcessingConfigMixin class or similar.

target=type(self)._run_processor_manager,
args=(
self._dag_directory,
Expand All @@ -342,7 +362,9 @@ def start(self):
self._processor_factory,
self._processor_timeout,
child_signal_conn,
self._async_mode,
self._dag_ids,
self._pickle_dags,
self._async_mode
)
)
self._process.start()
Expand Down Expand Up @@ -388,6 +410,8 @@ def _run_processor_manager(dag_directory,
processor_factory,
processor_timeout,
signal_conn,
dag_ids,
pickle_dags,
async_mode):

# Make this process start as a new process group - that makes it easy
Expand All @@ -414,6 +438,8 @@ def _run_processor_manager(dag_directory,
processor_factory,
processor_timeout,
signal_conn,
dag_ids,
pickle_dags,
async_mode)

processor_manager.start()
Expand Down Expand Up @@ -527,6 +553,10 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc
:type processor_timeout: timedelta
:param signal_conn: connection to communicate signal with processor agent.
:type signal_conn: airflow.models.connection.Connection
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[str]
:param pickle_dags: whether to pickle DAGs.
:type pickle_dags: bool
:param async_mode: whether to start the manager in async mode
:type async_mode: bool
"""
Expand All @@ -538,13 +568,17 @@ def __init__(self,
processor_factory: Callable[[str, List[Any]], AbstractDagFileProcessorProcess],
processor_timeout: timedelta,
signal_conn: Connection,
dag_ids: List[str],
pickle_dags: bool,
async_mode: bool = True):
self._file_paths = file_paths
self._file_path_queue: List[str] = []
self._dag_directory = dag_directory
self._max_runs = max_runs
self._processor_factory = processor_factory
self._signal_conn = signal_conn
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: Optional[datetime] = None

Expand Down Expand Up @@ -1055,7 +1089,7 @@ def heartbeat(self):
# Start more processors if we have enough slots and files to process
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, self._zombies)
processor = self._processor_factory(file_path, self._zombies, self._dag_ids, self._pickle_dags)
Stats.incr('dag_processing.processes')

processor.start()
Expand Down
25 changes: 25 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,31 @@ def test_scheduler_multiprocessing(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)

def test_scheduler_multiprocessing_with_spawn_method(self):
"""
Test that the scheduler can successfully queue multiple dags in parallel
"""
try:
conf.set('core', 'mp_start_method', 'spawn')
dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success']
for dag_id in dag_ids:
dag = self.dagbag.get_dag(dag_id)
dag.clear()

scheduler = SchedulerJob(dag_ids=dag_ids,
executor=self.null_exec,
subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'),
num_runs=1)
scheduler.run()

# zero tasks ran
dag_id = 'test_start_date_scheduling'
session = settings.Session()
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
finally:
conf.remove_option('core', 'mp_start_method')

@patch.object(TI, 'pool_full')
def test_scheduler_verify_pool_full(self, mock_pool_full):
"""
Expand Down
9 changes: 8 additions & 1 deletion tests/test_utils/mock_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@ def __init__(self, do_update=True, *args, **kwargs):
self.history = []
# All the tasks, in a stable sort order
self.sorted_tasks = []
self.mock_task_results = defaultdict(lambda: State.SUCCESS)

# If multiprocessing runs in spawn mode,
# arguments are to be pickled but lambda is not picclable.
# So we should pass self.success instead of lambda.
ashb marked this conversation as resolved.
Show resolved Hide resolved
self.mock_task_results = defaultdict(self.success)

super().__init__(*args, **kwargs)

def success(self):
return State.SUCCESS
ashb marked this conversation as resolved.
Show resolved Hide resolved

def heartbeat(self):
if not self.do_update:
return
Expand Down
Loading