Skip to content

Commit

Permalink
Rename [scheduler] max_threads to [scheduler] parsing_processes (a…
Browse files Browse the repository at this point in the history
…pache#12605)

From Airflow 2.0, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.

This is to align the name with the actual code where the Scheduler launches the number of processes defined by
`[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
serialize them and store them in the DB.
  • Loading branch information
kaxil authored Nov 25, 2020
1 parent c457c97 commit 4861344
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 13 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ assists users migrating to a new version.

## Master

### `[scheduler] max_threads` config has been renamed to `[scheduler] parsing_processes`

From Airflow 2.0, `max_threads` config under `[scheduler]` section has been renamed to `parsing_processes`.

This is to align the name with the actual code where the Scheduler launches the number of processes defined by
`[scheduler] parsing_processes` to Parse DAG files, calculates next DagRun date for each DAG,
serialize them and store them in the DB.

### Unify user session lifetime configuration

In previous version of Airflow user session lifetime could be configured by
Expand Down
6 changes: 3 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1732,10 +1732,10 @@
version_added: 2.0.0
type: boolean
default: ~
- name: max_threads
- name: parsing_processes
description: |
The scheduler can run multiple threads in parallel to schedule dags.
This defines how many threads will run.
The scheduler can run multiple processes in parallel to parse dags.
This defines how many processes will run.
version_added: ~
type: string
example: ~
Expand Down
6 changes: 3 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,9 @@ use_row_level_locking = True
# Default: True
# schedule_after_task_execution =

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
max_threads = 2
# The scheduler can run multiple processes in parallel to parse dags.
# This defines how many processes will run.
parsing_processes = 2

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ job_heartbeat_sec = 1
schedule_after_task_execution = False
scheduler_heartbeat_sec = 5
scheduler_health_check_threshold = 30
max_threads = 2
parsing_processes = 2
catchup_by_default = True
scheduler_zombie_task_threshold = 300
dag_dir_list_interval = 0
Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
('metrics', 'statsd_datadog_enabled'): ('scheduler', 'statsd_datadog_enabled'),
('metrics', 'statsd_datadog_tags'): ('scheduler', 'statsd_datadog_tags'),
('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path'),
('scheduler', 'parsing_processes'): ('scheduler', 'max_threads'),
}

# A mapping of old default values that we want to change and warn the user
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@ def __init__(
self._async_mode = async_mode
self._parsing_start_time: Optional[datetime] = None

self._parallelism = conf.getint('scheduler', 'max_threads')
self._parallelism = conf.getint('scheduler', 'parsing_processes')
if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1:
self.log.warning(
"Because we cannot use more than 1 thread (max_threads = "
"Because we cannot use more than 1 thread (parsing_processes = "
"%d ) when using sqlite. So we set parallelism to 1.",
self._parallelism,
)
Expand Down
7 changes: 5 additions & 2 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,11 @@ This means ``explicit_defaults_for_timestamp`` is disabled in your mysql server
How to reduce airflow dag scheduling latency in production?
-----------------------------------------------------------

- ``max_threads``: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by ``max_threads`` with default value of 2. User should increase this value to a larger value (e.g numbers of cpus where scheduler runs - 1) in production.
- If you're using Airflow 1.10.x, consider moving to Airflow 2, which has reduced dag scheduling latency dramatically, and allows for running multiple schedulers.
- ``parsing_processes``: Scheduler will spawn multiple threads in parallel to parse dags.
This is controlled by ``parsing_processes`` with default value of 2.
User should increase this value to a larger value (e.g numbers of cpus where scheduler runs + 1) in production.
- If you're using Airflow 1.10.x, consider moving to Airflow 2, which has reduced dag scheduling latency dramatically,
and allows for running multiple schedulers.

Why next_ds or prev_ds might not contain expected values?
---------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion scripts/in_container/airflow_ci.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ _test_only_string = this is a test
[scheduler]
job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
max_threads = 2
parsing_processes = 2
2 changes: 1 addition & 1 deletion tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def test_handle_failure_callback_with_zombies_are_correctly_passed_to_dag_file_p
file processors until the next zombie detection logic is invoked.
"""
test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py')
with conf_vars({('scheduler', 'max_threads'): '1', ('core', 'load_examples'): 'False'}):
with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}):
dagbag = DagBag(test_dag_path, read_dags_from_db=False)
with create_session() as session:
session.query(LJ).delete()
Expand Down

0 comments on commit 4861344

Please sign in to comment.