From 9ac093cf712c2a37e8ddbf29039135e7fef1e967 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 20 Sep 2022 12:17:44 -0700 Subject: [PATCH 1/4] Add fixture for CLI tests requiring sample dags I noticed that various CLI tests were failing (specifically in test_task_command.py) when run locally but not when run in breeze. I figured out the cause is that breeze has ('core', 'load_examples') set to true but I had it false locally. To fix this we can add a fixture that patches the conf settings. But we have to go one step further. If we grab the conf value in the default for the argument in DagBag, this means the value is fixed at the time the class gets defined. So the value cannot be changed later in runtime! The solution is to resolve the default on the fly each time an instance is created. We do the same for safe_mode while we're at it, because why not. --- airflow/models/dagbag.py | 6 +++++- tests/cli/conftest.py | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 8a5557c2c22645..f68394a66d510a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -54,6 +54,7 @@ from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries from airflow.utils.session import provide_session from airflow.utils.timeout import timeout +from airflow.utils.types import NOTSET if TYPE_CHECKING: import pathlib @@ -92,7 +93,7 @@ class DagBag(LoggingMixin): def __init__( self, dag_folder: str | pathlib.Path | None = None, - include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), + include_examples: bool = NOTSET, safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), read_dags_from_db: bool = False, store_serialized_dags: bool | None = None, @@ -103,6 +104,9 @@ def __init__( super().__init__() + if include_examples is NOTSET: + include_examples = conf.getboolean('core', 'LOAD_EXAMPLES') + if store_serialized_dags: warnings.warn( "The store_serialized_dags parameter has been deprecated. " diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index 0a2c062c66436e..67de520c38b5df 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -24,6 +24,7 @@ from airflow import models from airflow.cli import cli_parser from airflow.executors import celery_executor, celery_kubernetes_executor +from tests.test_utils.config import conf_vars # Create custom executors here because conftest is imported first custom_executor_module = type(sys)('custom_executor') @@ -36,6 +37,12 @@ sys.modules['custom_executor'] = custom_executor_module +@pytest.fixture(autouse=True) +def load_examples(): + with conf_vars({('core', 'load_examples'): 'True'}): + yield + + @pytest.fixture(scope="session") def dagbag(): return models.DagBag(include_examples=True) From 0004357098c7df8e6705581effaeddd33c342e52 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 20 Sep 2022 12:23:43 -0700 Subject: [PATCH 2/4] do for safe mode --- airflow/models/dagbag.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index f68394a66d510a..a1f3bb0af32f5a 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -54,7 +54,7 @@ from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries from airflow.utils.session import provide_session from airflow.utils.timeout import timeout -from airflow.utils.types import NOTSET +from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: import pathlib @@ -93,8 +93,8 @@ class DagBag(LoggingMixin): def __init__( self, dag_folder: str | pathlib.Path | None = None, - include_examples: bool = NOTSET, - safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), + include_examples: bool | ArgNotSet = NOTSET, + safe_mode: bool | ArgNotSet = NOTSET, read_dags_from_db: bool = False, store_serialized_dags: bool | None = None, load_op_links: bool = True, @@ -107,6 +107,9 @@ def __init__( if include_examples is NOTSET: include_examples = conf.getboolean('core', 'LOAD_EXAMPLES') + if safe_mode is NOTSET: + safe_mode = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE') + if store_serialized_dags: warnings.warn( "The store_serialized_dags parameter has been deprecated. " From 9928797bf5699a64e92d1eae1770b8131487e8c3 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 20 Sep 2022 13:18:41 -0700 Subject: [PATCH 3/4] fix tests -- they were dependent on the old non-dynamic default behavior --- tests/jobs/test_scheduler_job.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4284bf02d9695e..81fd6a30177973 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -104,6 +104,12 @@ def dagbag(): return DagBag(read_dags_from_db=True) +@pytest.fixture +def load_examples(): + with conf_vars({('core', 'load_examples'): 'True'}): + yield + + @pytest.mark.usefixtures("disable_load_example") @pytest.mark.need_serialized_dag class TestSchedulerJob: @@ -4014,7 +4020,7 @@ def test_find_zombies_nothing(self): self.scheduler_job.executor.callback_sink.send.assert_not_called() - def test_find_zombies(self): + def test_find_zombies(self, load_examples): dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) with create_session() as session: session.query(LocalTaskJob).delete() @@ -4072,7 +4078,7 @@ def test_find_zombies(self): session.query(TaskInstance).delete() session.query(LocalTaskJob).delete() - def test_zombie_message(self): + def test_zombie_message(self, load_examples): """ Check that the zombie message comes out as expected """ From 01896699ba09d2d29924f014f2d4241d29ae3e43 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 20 Sep 2022 17:32:46 -0700 Subject: [PATCH 4/4] fix mypy --- airflow/models/dagbag.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index a1f3bb0af32f5a..cabc142f31708e 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -104,11 +104,14 @@ def __init__( super().__init__() - if include_examples is NOTSET: - include_examples = conf.getboolean('core', 'LOAD_EXAMPLES') - - if safe_mode is NOTSET: - safe_mode = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE') + include_examples = ( + include_examples + if isinstance(include_examples, bool) + else conf.getboolean('core', 'LOAD_EXAMPLES') + ) + safe_mode = ( + safe_mode if isinstance(safe_mode, bool) else conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE') + ) if store_serialized_dags: warnings.warn(