Skip to content

Commit

Permalink
Fix the default value for store_dag_code (apache#9554)
Browse files Browse the repository at this point in the history
related to apache#8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation)

The default value of `store_dag_code` should be same as `store_serialized_dags` setting.  But if the value is set it should use that value
  • Loading branch information
kaxil authored Jun 29, 2020
1 parent 6b18ed4 commit 57c722b
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 8 deletions.
4 changes: 2 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@
``store_serialized_dags`` setting.
version_added: 1.10.10
type: string
example: ~
default: "%(store_serialized_dags)s"
example: "False"
default: ~
- name: max_num_rendered_ti_fields_per_task
description: |
Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
Expand Down
3 changes: 2 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ min_serialized_dag_update_interval = 30
# If set to True, Webserver reads file contents from DB instead of
# trying to access files in a DAG folder. Defaults to same as the
# ``store_serialized_dags`` setting.
store_dag_code = %(store_serialized_dags)s
# Example: store_dag_code = False
# store_dag_code =

# Maximum number of Rendered Task Instance Fields (Template Fields) per task to store
# in the Database.
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ def bulk_sync_to_db(cls, dags: Collection["DAG"], sync_time=None, session=None):
orm_dag.tags.append(dag_tag_orm)
session.add(dag_tag_orm)

if conf.getboolean('core', 'store_dag_code', fallback=False):
if settings.STORE_DAG_CODE:
DagCode.bulk_sync_to_db([dag.fileloc for dag in orm_dags])

session.commit()
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists

from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models import Base
from airflow.settings import STORE_DAG_CODE
from airflow.utils import timezone
from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -181,7 +181,7 @@ def code(cls, fileloc) -> str:
:return: source code as string
"""
if conf.getboolean('core', 'store_dag_code', fallback=False):
if STORE_DAG_CODE:
return cls._get_code_from_db(fileloc)
else:
return cls._get_code_from_file(fileloc)
Expand Down
5 changes: 5 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ def initialize():
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
'core', 'min_serialized_dag_update_interval', fallback=30)

# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents
# from DB instead of trying to access files in a DAG folder.
# Defaults to same as the store_serialized_dags setting.
STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS)

# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
# If the flag is set to False, we remove all handlers from the root logger
# and add all handlers from 'airflow.task' logger to the root Logger. This is done
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from airflow.exceptions import AirflowException
from airflow.models import errors
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
Expand Down Expand Up @@ -631,7 +631,7 @@ def __init__(self,
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))

# Should store dag file source in a database?
self.store_dag_code = conf.getboolean('core', 'store_dag_code', fallback=False)
self.store_dag_code = STORE_DAG_CODE
# Map from file path to the processor
self._processors: Dict[str, AbstractDagFileProcessorProcess] = {}

Expand Down
19 changes: 19 additions & 0 deletions tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,3 +599,22 @@ def test_run_command(self):

def test_confirm_unittest_mod(self):
self.assertTrue(conf.get('core', 'unit_test_mode'))

@conf_vars({("core", "store_serialized_dags"): "True"})
def test_store_dag_code_default_config(self):
store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
self.assertFalse(conf.has_option("core", "store_dag_code"))
self.assertTrue(store_serialized_dags)
self.assertTrue(store_dag_code)

@conf_vars({
("core", "store_serialized_dags"): "True",
("core", "store_dag_code"): "False"
})
def test_store_dag_code_config_when_set(self):
store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False)
store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags)
self.assertTrue(conf.has_option("core", "store_dag_code"))
self.assertTrue(store_serialized_dags)
self.assertFalse(store_dag_code)

0 comments on commit 57c722b

Please sign in to comment.