diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 0cfcd4d6879778..7f71647eca5de1 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -338,8 +338,8 @@ ``store_serialized_dags`` setting. version_added: 1.10.10 type: string - example: ~ - default: "%(store_serialized_dags)s" + example: "False" + default: ~ - name: max_num_rendered_ti_fields_per_task description: | Maximum number of Rendered Task Instance Fields (Template Fields) per task to store diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index cb4f22c796e65e..41feb52b731556 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -195,7 +195,8 @@ min_serialized_dag_update_interval = 30 # If set to True, Webserver reads file contents from DB instead of # trying to access files in a DAG folder. Defaults to same as the # ``store_serialized_dags`` setting. -store_dag_code = %(store_serialized_dags)s +# Example: store_dag_code = False +# store_dag_code = # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # in the Database. diff --git a/airflow/models/dag.py b/airflow/models/dag.py index ee3a475ec7959e..3e1dbed10ba7ac 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1589,7 +1589,7 @@ def bulk_sync_to_db(cls, dags: Collection["DAG"], sync_time=None, session=None): orm_dag.tags.append(dag_tag_orm) session.add(dag_tag_orm) - if conf.getboolean('core', 'store_dag_code', fallback=False): + if settings.STORE_DAG_CODE: DagCode.bulk_sync_to_db([dag.fileloc for dag in orm_dags]) session.commit() diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index 3e0e27633133d6..1780638033519c 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -22,9 +22,9 @@ from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists -from airflow.configuration import conf from airflow.exceptions import AirflowException, DagCodeNotFound from airflow.models import Base +from airflow.settings import STORE_DAG_CODE from airflow.utils import timezone from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped from airflow.utils.session import provide_session @@ -181,7 +181,7 @@ def code(cls, fileloc) -> str: :return: source code as string """ - if conf.getboolean('core', 'store_dag_code', fallback=False): + if STORE_DAG_CODE: return cls._get_code_from_db(fileloc) else: return cls._get_code_from_file(fileloc) diff --git a/airflow/settings.py b/airflow/settings.py index cce745999b1ca5..246bac1aa0ee85 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -340,6 +340,11 @@ def initialize(): MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint( 'core', 'min_serialized_dag_update_interval', fallback=30) +# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents +# from DB instead of trying to access files in a DAG folder. +# Defaults to same as the store_serialized_dags setting. +STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS) + # If donot_modify_handlers=True, we do not modify logging handlers in task_run command # If the flag is set to False, we remove all handlers from the root logger # and add all handlers from 'airflow.task' logger to the root Logger. This is done diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4e2990fee2dd23..63f0d34a5b5ff3 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -41,7 +41,7 @@ from airflow.exceptions import AirflowException from airflow.models import errors from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance -from airflow.settings import STORE_SERIALIZED_DAGS +from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.file import list_py_file_paths @@ -631,7 +631,7 @@ def __init__(self, conf.getint('scheduler', 'scheduler_zombie_task_threshold')) # Should store dag file source in a database? - self.store_dag_code = conf.getboolean('core', 'store_dag_code', fallback=False) + self.store_dag_code = STORE_DAG_CODE # Map from file path to the processor self._processors: Dict[str, AbstractDagFileProcessorProcess] = {} diff --git a/tests/test_configuration.py b/tests/test_configuration.py index 65a0e1d5b0d05c..09ab57adc5b9af 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -599,3 +599,22 @@ def test_run_command(self): def test_confirm_unittest_mod(self): self.assertTrue(conf.get('core', 'unit_test_mode')) + + @conf_vars({("core", "store_serialized_dags"): "True"}) + def test_store_dag_code_default_config(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertFalse(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertTrue(store_dag_code) + + @conf_vars({ + ("core", "store_serialized_dags"): "True", + ("core", "store_dag_code"): "False" + }) + def test_store_dag_code_config_when_set(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertTrue(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertFalse(store_dag_code)