Skip to content

Commit

Permalink
Fix the default value for store_dag_code (apache#9554)
Browse files Browse the repository at this point in the history
related to apache#8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation)

The default value of `store_dag_code` should be same as `store_serialized_dags` setting.  But if the value is set it should use that value

(cherry picked from commit 57c722b)
  • Loading branch information
kaxil authored and Chris Fei committed Mar 5, 2021
1 parent 3eccf5f commit 8b59bb3
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 8 deletions.
4 changes: 2 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@
``store_serialized_dags`` setting.
version_added: 1.10.10
type: string
example: ~
default: "%(store_serialized_dags)s"
example: "False"
default: ~
- name: max_num_rendered_ti_fields_per_task
description: |
Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30
# If set to True, Webserver reads file contents from DB instead of
# trying to access files in a DAG folder. Defaults to same as the
# ``store_serialized_dags`` setting.
store_dag_code = %(store_serialized_dags)s
# Example: store_dag_code = False
# store_dag_code =

# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
# in the Database.
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
orm_dag.schedule_interval = self.schedule_interval
orm_dag.tags = self.get_dagtags(session=session)

if conf.getboolean('core', 'store_dag_code', fallback=False):
if settings.STORE_DAG_CODE:
DagCode.bulk_sync_to_db([orm_dag.fileloc])

session.commit()
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists

from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models import Base
from airflow.settings import STORE_DAG_CODE
from airflow.utils import timezone
from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
from airflow.utils.db import provide_session
Expand Down Expand Up @@ -178,7 +178,7 @@ def code(cls, fileloc):
:return: source code as string
"""
if conf.getboolean('core', 'store_dag_code', fallback=False):
if STORE_DAG_CODE:
return cls._get_code_from_db(fileloc)
else:
return cls._get_code_from_file(fileloc)
Expand Down
5 changes: 5 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ def initialize():
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
'core', 'min_serialized_dag_update_interval', fallback=30)

# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
# from DB instead of trying to access files in a DAG folder.
# Defaults to same as the store_serialized_dags setting.
STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)

# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
# If the flag is set to False, we remove all handlers from the root logger
# and add all handlers from 'airflow.task' logger to the root Logger. This is done
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from airflow.exceptions import AirflowException
from airflow.settings import Stats
from airflow.models import errors
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
from airflow.utils import timezone
from airflow.utils.helpers import reap_process_group
from airflow.utils.db import provide_session
Expand Down Expand Up @@ -914,7 +914,7 @@ def _refresh_dag_dir(self):
SerializedDagModel.remove_deleted_dags(self._file_paths)
DagModel.deactivate_deleted_dags(self._file_paths)

if conf.getboolean('core', 'store_dag_code', fallback=False):
if STORE_DAG_CODE:
from airflow.models.dagcode import DagCode
DagCode.remove_deleted_code(self._file_paths)

Expand Down
20 changes: 20 additions & 0 deletions tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow import configuration
from airflow.configuration import conf, AirflowConfigParser, parameterized_config
from tests.compat import mock
from tests.test_utils.config import conf_vars
from tests.test_utils.reset_warning_registry import reset_warning_registry

if six.PY2:
Expand Down Expand Up @@ -494,3 +495,22 @@ def test_write_should_respect_env_variable(self):
conf.write(string_file)
content = string_file.getvalue()
self.assertIn("dags_folder = /tmp/test_folder", content)

@conf_vars({("core", "store_serialized_dags"): "True"})
def test_store_dag_code_default_config(self):
store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
self.assertFalse(conf.has_option("core", "store_dag_code"))
self.assertTrue(store_serialized_dags)
self.assertTrue(store_dag_code)

@conf_vars({
("core", "store_serialized_dags"): "True",
("core", "store_dag_code"): "False"
})
def test_store_dag_code_config_when_set(self):
store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
self.assertTrue(conf.has_option("core", "store_dag_code"))
self.assertTrue(store_serialized_dags)
self.assertFalse(store_dag_code)

0 comments on commit 8b59bb3

Please sign in to comment.