Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6529] Pickle error occurs when the scheduler tries to run on macOS. #7128

Closed
wants to merge 19 commits into from

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Jan 10, 2020

When we try to run the scheduler on macOS, we will get a serialization error like as follows.

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-01-10 19:54:41,974] {executor_loader.py:59} INFO - Using executor SequentialExecutor
[2020-01-10 19:54:41,983] {scheduler_job.py:1462} INFO - Starting the scheduler
[2020-01-10 19:54:41,984] {scheduler_job.py:1469} INFO - Processing each file at most -1 times
[2020-01-10 19:54:41,984] {scheduler_job.py:1472} INFO - Searching for files in /Users/sarutak/airflow/dags
[2020-01-10 19:54:42,025] {scheduler_job.py:1474} INFO - There are 27 files in /Users/sarutak/airflow/dags
[2020-01-10 19:54:42,025] {scheduler_job.py:1527} INFO - Resetting orphaned tasks for active dag runs
[2020-01-10 19:54:42,059] {scheduler_job.py:1500} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/Users/sarutak/work/oss/airflow-env/master-python3.8.1/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1498, in _execute
    self._execute_helper()
  File "/Users/sarutak/work/oss/airflow-env/master-python3.8.1/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1531, in _execute_helper
    self.processor_agent.start()
  File "/Users/sarutak/work/oss/airflow-env/master-python3.8.1/lib/python3.8/site-packages/airflow/utils/dag_processing.py", line 348, in start
    self._process.start()
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/context.py", line 283, in _Popen
    return Popen(process_obj)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/opt/python/3.8.1/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'SchedulerJob._execute.<locals>.processor_factory'

The reason is scheduler try to run subprocesses using multiprocessing with spawn mode and the mode tries to pickle objects. In this case, processor_factory inner method is tried to be pickled.
Actually, as of Python 3.8, spawn mode is the default mode in macOS.

The solution I propose is that pull the method out of the enclosing method.

Issue link: AIRFLOW-6529

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jan 10, 2020
@sarutak
Copy link
Member Author

sarutak commented Jan 10, 2020

I've tried to add a new test case for this issue but found it's difficult because multiprocessor.set_start_method is needed to be called at most once.

@kaxil kaxil requested a review from ashb January 10, 2020 11:46
tests/test_utils/mock_executor.py Show resolved Hide resolved
@@ -168,6 +169,7 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
processor_factory=MagicMock().return_value,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
pickle_dags=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use False in all these tests, which is I think the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll replace those change with False.

@ashb ashb changed the title [AIRFLOW-6529] Serialization error occurs when the scheduler tries to run on macOS. [AIRFLOW-6529] Pickle error occurs when the scheduler tries to run on macOS. Jan 10, 2020
@codecov-io
Copy link

codecov-io commented Jan 11, 2020

Codecov Report

Merging #7128 into master will decrease coverage by 0.32%.
The diff coverage is 89.65%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7128      +/-   ##
==========================================
- Coverage   85.40%   85.08%   -0.33%     
==========================================
  Files         710      723      +13     
  Lines       39485    39564      +79     
==========================================
- Hits        33724    33663      -61     
- Misses       5761     5901     +140     
Impacted Files Coverage Δ
airflow/jobs/scheduler_job.py 89.16% <84.61%> (-0.19%) ⬇️
airflow/utils/dag_processing.py 88.24% <93.75%> (+0.05%) ⬆️
airflow/contrib/hooks/azure_data_lake_hook.py 0.00% <0.00%> (-93.11%) ⬇️
airflow/contrib/sensors/azure_cosmos_sensor.py 0.00% <0.00%> (-81.25%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0.00%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0.00%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 45.25% <0.00%> (-46.72%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0.00%> (-23.53%) ⬇️
...rflow/contrib/operators/kubernetes_pod_operator.py 78.31% <0.00%> (-20.49%) ⬇️
airflow/configuration.py 88.01% <0.00%> (-3.43%) ⬇️
... and 79 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 30b54d3...b612321. Read the comment docs.

@sarutak
Copy link
Member Author

sarutak commented Jan 12, 2020

I found another multiprocessing issue with spawn mode (actually, non-fork mode).
The new problem was that updated configuration is not inherited by subprocesses.
So test_dag_processing.py::TestDagFileProcessorAgent::reload_module failed macOS.

@potiuk
Copy link
Member

potiuk commented Jan 12, 2020

I found another multiprocessing issue with spawn mode (actually, non-fork mode).
The new problem was that updated configuration is not inherited by subprocesses.
So test_dag_processing.py::TestDagFileProcessorAgent::reload_module failed macOS.

Yet another reason why we should test it automatically I think.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need just one test to run (and the current way will not work as you think) - see the detail in review

.travis.yml Outdated Show resolved Hide resolved
@potiuk
Copy link
Member

potiuk commented Jan 15, 2020

BTW. It would be great if you rebase, to make sure that we only see changes from your change @sarutak

@sarutak
Copy link
Member Author

sarutak commented Jan 15, 2020

@potiuk All right. I'll do it.

for section in inherited_conf:
for key, value in inherited_conf[section].items():
if value not in conf:
conf.set(section, key, value.replace("%", "%%"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspect. What's going on here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see anywhere that we actually pass a value here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn method doesn't inherit environment except for a little bit of things so conf is not also inherited by child process. I noticed test_reload_module in test_dag_processing.py fails because DAG_PROCESSOR_MANAGER_LOG_LOCATION configured is not inherited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not DAG_PROCESSOR_MANAGER_LOG_LOCATION but logging_config_class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb If only test_reload_module needs to dynamically change the configuration and propagate it to child processes, I'll remove inherited_conf and its related code.
What do you think?

if value not in conf:
conf.set(section, key, value.replace("%", "%%"))

setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is inherited_conf used -- I can't find this one used anywhere either.

We want to set the process title always.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is just a miss-indentation. I'll fix it.

@stale
Copy link

stale bot commented Mar 1, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added stale Stale PRs per the .github/workflows/stale.yml policy file and removed stale Stale PRs per the .github/workflows/stale.yml policy file labels Mar 1, 2020
return self._result


def fake_dag_file_processor_factory(file_path, zombies, dag_ids, pickle_dags):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, it might be worth keeping the argument order consistent. So the preceding line would be:
def fake_dag_file_processor_factory(file_path, pickle_dags, dag_ids, zombies):

Comment on lines +343 to +356
if conf.has_option('core', 'mp_start_method'):
mp_start_method = conf.get('core', 'mp_start_method')
else:
mp_start_method = mp.get_start_method()

possible_value_list = mp.get_all_start_methods()
if mp_start_method not in possible_value_list:
raise AirflowConfigException(
"mp_start_method should not be " + mp_start_method +
". Possible value is one of " + str(possible_value_list))
cxt = mp.get_context(mp_start_method)

self._parent_signal_conn, child_signal_conn = cxt.Pipe()
self._process = cxt.Process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this block of code is identical to https://github.com/apache/airflow/pull/7128/files#diff-c35269bcfbbe386e269ffa7487e86192R171-R184. Given that they would probably need to change at the same time if either ever changed, this might be a good case for a utility mixin. Lines 343-352 could go into a MultiProcessingConfigMixin class or similar.

# as its processing result w/o actually parsing anything.
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)
self._result = zombies, 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth defining the zero to provide some context for what it represents.

@jhtimmins
Copy link
Contributor

@sarutak @potiuk @ashb do any of you know what the status of this is? It looks like all requested changes have been made. Won't solve all problems related to adding Python 3.8 support, but gets at least part of the way there.

@ashb
Copy link
Member

ashb commented Apr 29, 2020

@jhtimmins have you tried running with this PR? Is it closer to py3.8 support with it?

Someone will need to resolve the conflicts it looks like too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants