Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Move celery.default_queue to operators.default_queue to allow re-use between executors #14699

Merged
merged 3 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ This setting can be raised to an even higher value, currently it is
set to `6000` seconds (10 minutes) to
serve as a DagBag cache burst time.

### `default_queue` configuration has been moved to the `operators` section.

The `default_queue` configuration option has been moved from `[celery]` section to `[operators]` section to allow for re-use between different executors.

## Airflow 2.0.1

### Permission to view Airflow Configurations has been removed from `User` and `Viewer` role
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def _check(value):
ARG_QUEUES = Arg(
("-q", "--queues"),
help="Comma delimited list of queues to serve",
default=conf.get('celery', 'DEFAULT_QUEUE'),
default=conf.get('operators', 'DEFAULT_QUEUE'),
)
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
Expand Down
14 changes: 7 additions & 7 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,13 @@
type: string
example: ~
default: "0"
- name: default_queue
description: |
Default queue that tasks get assigned to and that worker listen on.
version_added: ~
type: string
example: ~
default: "default"
- name: allow_illegal_arguments
description: |
Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator.
Expand Down Expand Up @@ -1507,13 +1514,6 @@
sensitive: true
example: "user1:password1,user2:password2"
default: ""
- name: default_queue
description: |
Default queue that tasks get assigned to and that worker listen on.
version_added: ~
type: string
example: ~
default: "default"
- name: sync_parallelism
description: |
How many processes CeleryExecutor uses to sync task state.
Expand Down
6 changes: 3 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ default_ram = 512
default_disk = 512
default_gpus = 0

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

# Is allowed to pass additional/unused arguments (args, kwargs) to the BaseOperator operator.
# If set to False, an exception will be thrown, otherwise only the console message will be displayed.
allow_illegal_arguments = False
Expand Down Expand Up @@ -753,9 +756,6 @@ flower_port = 5555
# Example: flower_basic_auth = user1:password1,user2:password2
flower_basic_auth =

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

# How many processes CeleryExecutor uses to sync task state.
# 0 means to use max(1, number of cores - 1) processes.
sync_parallelism = 0
Expand Down
4 changes: 2 additions & 2 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def _broker_supports_visibility_timeout(url):
'event_serializer': 'json',
'worker_prefetch_multiplier': conf.getint('celery', 'worker_prefetch_multiplier', fallback=1),
'task_acks_late': True,
'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_queue': conf.get('operators', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('operators', 'DEFAULT_QUEUE'),
'task_track_started': conf.get('celery', 'task_track_started', fallback=True),
'broker_url': broker_url,
'broker_transport_options': broker_transport_options,
Expand Down
1 change: 0 additions & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_host = 0.0.0.0
flower_port = 5555
default_queue = default
sync_parallelism = 0
worker_precheck = False

Expand Down
1 change: 1 addition & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
('metrics', 'statsd_datadog_tags'): ('scheduler', 'statsd_datadog_tags', '2.0.0'),
('metrics', 'statsd_custom_client_path'): ('scheduler', 'statsd_custom_client_path', '2.0.0'),
('scheduler', 'parsing_processes'): ('scheduler', 'max_threads', '1.10.14'),
('operators', 'default_queue'): ('celery', 'default_queue', '2.1.0'),
leonsmith marked this conversation as resolved.
Show resolved Hide resolved
}

# A mapping of old default values that we want to change and warn the user
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def __init__(
default_args: Optional[Dict] = None, # pylint: disable=unused-argument
priority_weight: int = 1,
weight_rule: str = WeightRule.DOWNSTREAM,
queue: str = conf.get('celery', 'default_queue'),
queue: str = conf.get('operators', 'default_queue'),
pool: Optional[str] = None,
pool_slots: int = 1,
sla: Optional[timedelta] = None,
Expand Down
3 changes: 2 additions & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,11 +738,12 @@ config:
statsd_port: 9125
statsd_prefix: airflow
statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
operators:
default_queue: celery
webserver:
enable_proxy_fix: 'True'
rbac: 'True'
celery:
default_queue: celery
worker_concurrency: 16
scheduler:
scheduler_heartbeat_sec: 5
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/executor/celery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Queues
When using the CeleryExecutor, the Celery queues that tasks are sent to
can be specified. ``queue`` is an attribute of BaseOperator, so any
task can be assigned to any queue. The default queue for the environment
is defined in the ``airflow.cfg``'s ``celery -> default_queue``. This defines
is defined in the ``airflow.cfg``'s ``operators -> default_queue``. This defines
the queue that tasks get assigned to when not specified, as well as which
queue Airflow workers listen to when started.

Expand Down