From 21b950fafa2e4aee14abaa1fcd746ce57649dbab Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sat, 27 Jul 2024 20:15:24 +0200 Subject: [PATCH] Remove Database Isolation feature flag and run DB isolation tests This PR removes AIP-44 feature flag and replaces "in-progress-disabled" test with dedicated "DatabaseIsolation" one. The DatabaseIsolation test will run all "db-tests" with enabled DB isolation mode and running `internal-api` component - groups of tests marked with "skip-if-database-isolation" will be skipped. --- .github/workflows/basic-tests.yml | 6 -- .github/workflows/ci.yml | 1 - .github/workflows/run-unit-tests.yml | 9 ++- .github/workflows/special-tests.yml | 10 +-- .../endpoints/rpc_api_endpoint.py | 6 +- airflow/api_internal/internal_api_call.py | 6 +- airflow/cli/cli_config.py | 51 +++++++--------- airflow/models/baseoperator.py | 3 +- airflow/operators/python.py | 5 +- airflow/serialization/serialized_objects.py | 16 +---- airflow/settings.py | 61 +++++++++++-------- airflow/www/app.py | 3 - .../commands/testing_commands.py | 2 + .../src/airflow_breeze/params/shell_params.py | 1 - .../src/airflow_breeze/utils/run_tests.py | 7 ++- scripts/ci/docker-compose/devcontainer.env | 1 - .../endpoints/test_rpc_api_endpoint.py | 2 - tests/api_internal/test_internal_api_call.py | 3 - .../cli/commands/test_internal_api_command.py | 5 +- tests/cli/commands/test_webserver_command.py | 3 + tests/conftest.py | 1 - tests/core/test_settings.py | 6 +- tests/decorators/test_bash.py | 3 + .../decorators/test_branch_external_python.py | 3 +- tests/decorators/test_branch_python.py | 3 +- tests/decorators/test_branch_virtualenv.py | 3 +- tests/decorators/test_condition.py | 3 +- tests/decorators/test_external_python.py | 2 +- tests/decorators/test_python.py | 11 +++- tests/decorators/test_python_virtualenv.py | 4 +- tests/decorators/test_sensor.py | 3 +- tests/decorators/test_short_circuit.py | 2 +- tests/operators/test_python.py | 26 +++----- tests/serialization/test_dag_serialization.py | 2 + tests/serialization/test_pydantic_models.py | 6 +- .../serialization/test_serialized_objects.py | 4 -- 36 files changed, 135 insertions(+), 148 deletions(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 9828a14993581c..db84bae38e2e2a 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -52,12 +52,6 @@ on: # yamllint disable-line rule:truthy description: "Whether to run only latest version checks (true/false)" required: true type: string - enable-aip-44: - description: "Whether to enable AIP-44 (true/false)" - required: true - type: string -env: - AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}" jobs: run-breeze-tests: timeout-minutes: 10 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e1db2a14edfcb..c26f944ffc0af7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -175,7 +175,6 @@ jobs: skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}} canary-run: ${{needs.build-info.outputs.canary-run}} latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}} - enable-aip-44: "false" build-ci-images: name: > diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 7828e50ed7e954..80f2bccf393d77 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -99,10 +99,10 @@ on: # yamllint disable-line rule:truthy required: false default: "false" type: string - enable-aip-44: - description: "Whether to enable AIP-44 or not (true/false)" + database-isolation: + description: "Whether to enable database isolattion or not (true/false)" required: false - default: "true" + default: "false" type: string force-lowest-dependencies: description: "Whether to force lowest dependencies for the tests or not (true/false)" @@ -129,8 +129,6 @@ jobs: backend-version: "${{fromJSON(inputs.backend-versions)}}" exclude: "${{fromJSON(inputs.excludes)}}" env: - # yamllint disable rule:line-length - AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}" BACKEND: "${{ inputs.backend }}" BACKEND_VERSION: "${{ matrix.backend-version }}" DB_RESET: "true" @@ -152,6 +150,7 @@ jobs: PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}" UPGRADE_BOTO: "${{ inputs.upgrade-boto }}" AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}" + DATABASE_ISOLATION: "${{ inputs.database-isolation }}" VERBOSE: "true" steps: - name: "Cleanup repo" diff --git a/.github/workflows/special-tests.yml b/.github/workflows/special-tests.yml index 000b5aa3d958b9..5ee7b3437381f7 100644 --- a/.github/workflows/special-tests.yml +++ b/.github/workflows/special-tests.yml @@ -171,8 +171,8 @@ jobs: run-coverage: ${{ inputs.run-coverage }} debug-resources: ${{ inputs.debug-resources }} - tests-in-progress-disabled: - name: "In progress disabled test" + tests-database-isolation: + name: "Database isolation test" uses: ./.github/workflows/run-unit-tests.yml permissions: contents: read @@ -180,9 +180,9 @@ jobs: secrets: inherit with: runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }} - enable-aip-44: "false" - test-name: "InProgressDisabled-Postgres" - test-scope: "All" + database-isolation: "true" + test-name: "DatabaseIsolation-Postgres" + test-scope: "DB" backend: "postgres" image-tag: ${{ inputs.image-tag }} python-versions: "['${{ inputs.default-python-version }}']" diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index e4a5069b29bcc0..c3d8b671fbb089 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -228,20 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: except Exception: return log_and_build_error_response(message="Error deserializing parameters.", status=400) - log.info("Calling method %s\nparams: %s", method_name, params) + log.debug("Calling method %s\nparams: %s", method_name, params) try: # Session must be created there as it may be needed by serializer for lazy-loaded fields. with create_session() as session: output = handler(**params, session=session) output_json = BaseSerialization.serialize(output, use_pydantic_models=True) response = json.dumps(output_json) if output_json is not None else None - log.info("Sending response: %s", response) + log.debug("Sending response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) # In case of AirflowException or other selective known types, transport the exception class back to caller except (KeyError, AttributeError, AirflowException) as e: exception_json = BaseSerialization.serialize(e, use_pydantic_models=True) response = json.dumps(exception_json) - log.info("Sending exception response: %s", response) + log.debug("Sending exception response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) except Exception: return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 8838377877becc..f93334dfda4ccb 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -30,7 +30,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowConfigException, AirflowException -from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components +from airflow.settings import force_traceback_session_for_untrusted_components from airflow.typing_compat import ParamSpec from airflow.utils.jwt_signer import JWTSigner @@ -55,8 +55,6 @@ def set_use_database_access(component: str): This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server """ InternalApiConfig._use_internal_api = False - if not _ENABLE_AIP_44: - raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ") logger.info( "DB isolation mode. But this is a trusted component and DB connection is set. " "Using database direct access when running %s.", @@ -65,8 +63,6 @@ def set_use_database_access(component: str): @staticmethod def set_use_internal_api(component: str, allow_tests_to_use_db: bool = False): - if not _ENABLE_AIP_44: - raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ") internal_api_url = conf.get("core", "internal_api_url") url_conf = urlparse(internal_api_url) api_path = url_conf.path diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 269916548401de..2a9b539926dc64 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -31,7 +31,6 @@ from airflow import settings from airflow.cli.commands.legacy_commands import check_legacy_command from airflow.configuration import conf -from airflow.settings import _ENABLE_AIP_44 from airflow.utils.cli import ColorMode from airflow.utils.module_loading import import_string from airflow.utils.state import DagRunState, JobState @@ -2080,34 +2079,30 @@ class GroupCommand(NamedTuple): func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"), args=(), ), -] - -if _ENABLE_AIP_44: - core_commands.append( - ActionCommand( - name="internal-api", - help="Start a Airflow Internal API instance", - func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"), - args=( - ARG_INTERNAL_API_PORT, - ARG_INTERNAL_API_WORKERS, - ARG_INTERNAL_API_WORKERCLASS, - ARG_INTERNAL_API_WORKER_TIMEOUT, - ARG_INTERNAL_API_HOSTNAME, - ARG_PID, - ARG_DAEMON, - ARG_STDOUT, - ARG_STDERR, - ARG_INTERNAL_API_ACCESS_LOGFILE, - ARG_INTERNAL_API_ERROR_LOGFILE, - ARG_INTERNAL_API_ACCESS_LOGFORMAT, - ARG_LOG_FILE, - ARG_SSL_CERT, - ARG_SSL_KEY, - ARG_DEBUG, - ), + ActionCommand( + name="internal-api", + help="Start a Airflow Internal API instance", + func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"), + args=( + ARG_INTERNAL_API_PORT, + ARG_INTERNAL_API_WORKERS, + ARG_INTERNAL_API_WORKERCLASS, + ARG_INTERNAL_API_WORKER_TIMEOUT, + ARG_INTERNAL_API_HOSTNAME, + ARG_PID, + ARG_DAEMON, + ARG_STDOUT, + ARG_STDERR, + ARG_INTERNAL_API_ACCESS_LOGFILE, + ARG_INTERNAL_API_ERROR_LOGFILE, + ARG_INTERNAL_API_ACCESS_LOGFORMAT, + ARG_LOG_FILE, + ARG_SSL_CERT, + ARG_SSL_KEY, + ARG_DEBUG, ), - ) + ), +] def _remove_dag_id_opt(command: ActionCommand): diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index ea100cd4e2abf8..4a866147b0b8b1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1512,10 +1512,11 @@ def run( data_interval=info.data_interval, ) ti = TaskInstance(self, run_id=dr.run_id) + session.add(ti) ti.dag_run = dr session.add(dr) session.flush() - + session.commit() ti.run( mark_success=mark_success, ignore_depends_on_past=ignore_depends_on_past, diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 09b2644beeee99..eaa9cef0b9ce34 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -49,7 +49,6 @@ from airflow.models.taskinstance import _CURRENT_CONTEXT from airflow.models.variable import Variable from airflow.operators.branch import BranchMixIn -from airflow.settings import _ENABLE_AIP_44 from airflow.typing_compat import Literal from airflow.utils import hashlib_wrapper from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge @@ -552,8 +551,8 @@ def _execute_python_callable_in_subprocess(self, python_path: Path): self._write_args(input_path) self._write_string_args(string_args_path) - if self.use_airflow_context and (not is_pydantic_2_installed() or not _ENABLE_AIP_44): - error_msg = "`get_current_context()` needs to be used with Pydantic 2 and AIP-44 enabled." + if self.use_airflow_context and not is_pydantic_2_installed(): + error_msg = "`get_current_context()` needs to be used with Pydantic 2." raise AirflowException(error_msg) jinja_context = { diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 4004b83a991bd3..05624e0bb181f7 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -72,7 +72,7 @@ from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic from airflow.serialization.pydantic.tasklog import LogTemplatePydantic from airflow.serialization.pydantic.trigger import TriggerPydantic -from airflow.settings import _ENABLE_AIP_44, DAGS_FOLDER, json +from airflow.settings import DAGS_FOLDER, json from airflow.task.priority_strategy import ( PriorityWeightStrategy, airflow_priority_weight_strategies, @@ -627,11 +627,6 @@ def serialize( :meta private: """ - if use_pydantic_models and not _ENABLE_AIP_44: - raise RuntimeError( - "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " - "This parameter will be removed eventually when new serialization is used by AIP-44" - ) if cls._is_primitive(var): # enum.IntEnum is an int instance, it causes json dumps error so we use its value. if isinstance(var, enum.Enum): @@ -758,7 +753,7 @@ def serialize( obj = cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models) d[str(k)] = obj return cls._encode(d, type_=DAT.TASK_CONTEXT) - elif use_pydantic_models and _ENABLE_AIP_44: + elif use_pydantic_models: def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> dict[str, Any]: return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined] @@ -790,11 +785,6 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: :meta private: """ # JSON primitives (except for dict) are not encoded. - if use_pydantic_models and not _ENABLE_AIP_44: - raise RuntimeError( - "Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. " - "This parameter will be removed eventually when new serialization is used by AIP-44" - ) if cls._is_primitive(encoded_var): return encoded_var elif isinstance(encoded_var, list): @@ -886,7 +876,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: return SlaCallbackRequest.from_json(var) elif type_ == DAT.TASK_INSTANCE_KEY: return TaskInstanceKey(**var) - elif use_pydantic_models and _ENABLE_AIP_44: + elif use_pydantic_models: return _type_to_class[type_][0].model_validate(var) elif type_ == DAT.ARG_NOT_SET: return NOTSET diff --git a/airflow/settings.py b/airflow/settings.py index c780ccd6c5d0bb..07d4329cd59f64 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -312,6 +312,8 @@ def remove(*args, **kwargs): AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py") AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py") AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py") +AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py") +AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py") class TracebackSessionForTests: @@ -369,6 +371,9 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] :return: True if the object was created from test code, False otherwise. """ self.traceback = traceback.extract_stack() + if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback): + # This is a fixture call + return True, None airflow_frames = [ tb for tb in self.traceback @@ -377,24 +382,30 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] and not tb.filename == AIRFLOW_UTILS_SESSION_PATH ] if any( - filename.endswith("conftest.py") or filename.endswith("tests/test_utils/db.py") - for filename, _, _, _ in airflow_frames + filename.endswith("conftest.py") + or filename.endswith("tests/test_utils/db.py") + or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method")) + for filename, _, name, _ in airflow_frames ): # This is a fixture call or testing utilities return True, None - if ( - len(airflow_frames) >= 2 - and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH) - and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH - and airflow_frames[-1].name == "run" - ): - # This is baseoperator run method that is called directly from the test code and this is - # usual pattern where we create a session in the test code to create dag_runs for tests. - # If `run` code will be run inside a real "airflow" code the stack trace would be longer - # and it would not be directly called from the test code. Also if subsequently any of the - # run_task() method called later from the task code will attempt to execute any DB - # method, the stack trace will be longer and we will catch it as "illegal" call. - return True, None + if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH): + # Let's look at what we are calling directly from the test code + current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name + if (current_filename, current_method_name) in ( + (AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"), + (AIRFLOW_MODELS_DAG_PATH, "create_dagrun"), + ): + # This is baseoperator run method that is called directly from the test code and this is + # usual pattern where we create a session in the test code to create dag_runs for tests. + # If `run` code will be run inside a real "airflow" code the stack trace would be longer + # and it would not be directly called from the test code. Also if subsequently any of the + # run_task() method called later from the task code will attempt to execute any DB + # method, the stack trace will be longer and we will catch it as "illegal" call. + return True, None + if current_filename == AIRFLOW_DB_UTILS_PATH: + # This is a util method called directly from the test code + return True, None for tb in airflow_frames[::-1]: if tb.filename.startswith(AIRFLOW_PATH): if tb.filename.startswith(AIRFLOW_TESTS_PATH): @@ -406,6 +417,16 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] # The traceback line will be always 3rd (two bottom ones are Airflow) return False, self.traceback[-2] + def get_bind( + self, + mapper=None, + clause=None, + bind=None, + _sa_skip_events=None, + _sa_skip_for_implicit_returning=False, + ): + pass + def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: """Determine whether the database connection URI specifies a relative path.""" @@ -858,13 +879,3 @@ def is_usage_data_collection_enabled() -> bool: AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved" DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077") - -# AIP-44: internal_api (experimental) -# This feature is not complete yet, so we disable it by default. -_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in { - "true", - "t", - "yes", - "y", - "1", -} diff --git a/airflow/www/app.py b/airflow/www/app.py index 93c4e91d6d2a72..170941fdde4141 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -33,7 +33,6 @@ from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning from airflow.logging_config import configure_logging from airflow.models import import_all_models -from airflow.settings import _ENABLE_AIP_44 from airflow.utils.json import AirflowJsonProvider from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links @@ -171,8 +170,6 @@ def create_app(config=None, testing=False): init_error_handlers(flask_app) init_api_connexion(flask_app) if conf.getboolean("webserver", "run_internal_api", fallback=False): - if not _ENABLE_AIP_44: - raise RuntimeError("The AIP_44 is not enabled so you cannot use it.") init_api_internal(flask_app) init_api_auth_provider(flask_app) init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index 51ca4bea5d6368..cef51d975219e0 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -206,6 +206,7 @@ def _run_test( helm_test_package=None, keep_env_variables=shell_params.keep_env_variables, no_db_cleanup=shell_params.no_db_cleanup, + database_isolation=shell_params.database_isolation, ) ) run_cmd.extend(list(extra_pytest_args)) @@ -968,6 +969,7 @@ def helm_tests( helm_test_package=helm_test_package, keep_env_variables=False, no_db_cleanup=False, + database_isolation=False, ) cmd = ["docker", "compose", "run", "--service-ports", "--rm", "airflow", *pytest_args, *extra_pytest_args] result = run_command(cmd, check=False, env=env, output_outside_the_group=True) diff --git a/dev/breeze/src/airflow_breeze/params/shell_params.py b/dev/breeze/src/airflow_breeze/params/shell_params.py index a0504ee7a3df3a..b9cc37a5f10ae6 100644 --- a/dev/breeze/src/airflow_breeze/params/shell_params.py +++ b/dev/breeze/src/airflow_breeze/params/shell_params.py @@ -481,7 +481,6 @@ def env_variables_for_docker_commands(self) -> dict[str, str]: _set_var(_env, "AIRFLOW_CONSTRAINTS_LOCATION", self.airflow_constraints_location) _set_var(_env, "AIRFLOW_CONSTRAINTS_MODE", self.airflow_constraints_mode) _set_var(_env, "AIRFLOW_CONSTRAINTS_REFERENCE", self.airflow_constraints_reference) - _set_var(_env, "AIRFLOW_ENABLE_AIP_44", None, "true") _set_var(_env, "AIRFLOW_ENV", "development") _set_var(_env, "AIRFLOW_EXTRAS", self.airflow_extras) _set_var(_env, "AIRFLOW_SKIP_CONSTRAINTS", self.airflow_skip_constraints) diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index 51d7529eb45f5a..c52b7903b4c558 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -310,6 +310,7 @@ def generate_args_for_pytest( helm_test_package: str | None, keep_env_variables: bool, no_db_cleanup: bool, + database_isolation: bool, ): result_log_file, warnings_file, coverage_file = test_paths(test_type, backend, helm_test_package) if skip_db_tests: @@ -326,12 +327,16 @@ def generate_args_for_pytest( helm_test_package=helm_test_package, python_version=python_version, ) + + max_fail = 50 + if database_isolation: + max_fail = 1000 args.extend( [ "--verbosity=0", "--strict-markers", "--durations=100", - "--maxfail=50", + f"--maxfail={max_fail}", "--color=yes", f"--junitxml={result_log_file}", # timeouts in seconds for individual tests diff --git a/scripts/ci/docker-compose/devcontainer.env b/scripts/ci/docker-compose/devcontainer.env index 2b7cddb47eb6a5..927369ccbb64d7 100644 --- a/scripts/ci/docker-compose/devcontainer.env +++ b/scripts/ci/docker-compose/devcontainer.env @@ -17,7 +17,6 @@ HOME= AIRFLOW_CI_IMAGE="ghcr.io/apache/airflow/main/ci/python3.8:latest" ANSWER= -AIRFLOW_ENABLE_AIP_44="true" AIRFLOW_ENV="development" PYTHON_MAJOR_MINOR_VERSION="3.8" AIRFLOW_EXTRAS= diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py b/tests/api_internal/endpoints/test_rpc_api_endpoint.py index f12e0ae087bbbe..16dbcbeeea8365 100644 --- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py +++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py @@ -30,7 +30,6 @@ from airflow.operators.empty import EmptyOperator from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic from airflow.serialization.serialized_objects import BaseSerialization -from airflow.settings import _ENABLE_AIP_44 from airflow.utils.jwt_signer import JWTSigner from airflow.utils.state import State from airflow.www import app @@ -72,7 +71,6 @@ def equals(a, b) -> bool: return a == b -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") class TestRpcApiEndpoint: @pytest.fixture def setup_attrs(self, minimal_app_for_internal_api: Flask) -> Generator: diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index d779b504ea479a..784c0b4d4c9c4a 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -32,7 +32,6 @@ from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator from airflow.serialization.serialized_objects import BaseSerialization -from airflow.settings import _ENABLE_AIP_44 from airflow.utils.state import State from tests.test_utils.config import conf_vars @@ -61,7 +60,6 @@ def reset_init_api_config(): settings.SQL_ALCHEMY_CONN = old_conn -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") class TestInternalApiConfig: @conf_vars( { @@ -97,7 +95,6 @@ def test_force_database_direct_access(self): assert InternalApiConfig.get_use_internal_api() is False -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") class TestInternalApiCall: @staticmethod @internal_api_call diff --git a/tests/cli/commands/test_internal_api_command.py b/tests/cli/commands/test_internal_api_command.py index 99992e62668618..37e83b8d4fabe2 100644 --- a/tests/cli/commands/test_internal_api_command.py +++ b/tests/cli/commands/test_internal_api_command.py @@ -30,7 +30,6 @@ from airflow.cli import cli_parser from airflow.cli.commands import internal_api_command from airflow.cli.commands.internal_api_command import GunicornMonitor -from airflow.settings import _ENABLE_AIP_44 from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass from tests.test_utils.config import conf_vars @@ -83,8 +82,10 @@ def test_ready_prefix_on_cmdline_dead_process(self): assert self.monitor._get_num_ready_workers_running() == 0 +# Those tests are skipped in isolation mode because they interfere with the internal API +# server already running in the background in the isolation mode. +@pytest.mark.skip_if_database_isolation_mode @pytest.mark.db_test -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") class TestCliInternalAPI(_ComonCLIGunicornTestClass): main_process_regexp = r"airflow internal-api" diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py index 07d95a9e5f75a2..fa2e58af9efe42 100644 --- a/tests/cli/commands/test_webserver_command.py +++ b/tests/cli/commands/test_webserver_command.py @@ -226,6 +226,9 @@ def test_ready_prefix_on_cmdline_dead_process(self): assert self.monitor._get_num_ready_workers_running() == 0 +# Those tests are skipped in isolation mode because they interfere with the internal API +# server already running in the background in the isolation mode. +@pytest.mark.skip_if_database_isolation_mode @pytest.mark.db_test class TestCliWebServer(_ComonCLIGunicornTestClass): main_process_regexp = r"airflow webserver" diff --git a/tests/conftest.py b/tests/conftest.py index 065895d5c45699..11cede8a12716e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -118,7 +118,6 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True" os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" os.environ["CREDENTIALS_DIR"] = os.environ.get("CREDENTIALS_DIR") or "/files/airflow-breeze-config/keys" -os.environ["AIRFLOW_ENABLE_AIP_44"] = os.environ.get("AIRFLOW_ENABLE_AIP_44") or "true" if platform.system() == "Darwin": # mocks from unittest.mock work correctly in subprocesses only if they are created by "fork" method diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py index d05344bfa91d89..c3c07abef0b75c 100644 --- a/tests/core/test_settings.py +++ b/tests/core/test_settings.py @@ -31,7 +31,7 @@ from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.configuration import conf from airflow.exceptions import AirflowClusterPolicyViolation, AirflowConfigException -from airflow.settings import _ENABLE_AIP_44, TracebackSession, is_usage_data_collection_enabled +from airflow.settings import TracebackSession, is_usage_data_collection_enabled from airflow.utils.session import create_session from tests.test_utils.config import conf_vars @@ -301,7 +301,6 @@ def test_encoding_absent_in_v2(is_v1, mock_conf): assert "encoding" not in engine_args -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") @conf_vars( { ("core", "database_access_isolation"): "true", @@ -309,7 +308,7 @@ def test_encoding_absent_in_v2(is_v1, mock_conf): ("database", "sql_alchemy_conn"): "none://", } ) -def test_get_traceback_session_if_aip_44_enabled(clear_internal_api): +def test_get_traceback_session(clear_internal_api): configure_internal_api(Namespace(subcommand="worker"), conf) assert InternalApiConfig.get_use_internal_api() is True @@ -326,7 +325,6 @@ def test_get_traceback_session_if_aip_44_enabled(clear_internal_api): session.execute() -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") @conf_vars( { ("core", "database_access_isolation"): "true", diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py index ba8948936eda1e..9fa7999e834769 100644 --- a/tests/decorators/test_bash.py +++ b/tests/decorators/test_bash.py @@ -33,6 +33,9 @@ DEFAULT_DATE = timezone.datetime(2023, 1, 1) +# TODO(potiuk) see why this test hangs in DB isolation mode +pytestmark = pytest.mark.skip_if_database_isolation_mode + @pytest.mark.db_test class TestBashDecorator: diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py index d991f22cd55e4d..d2466365bef8d2 100644 --- a/tests/decorators/test_branch_external_python.py +++ b/tests/decorators/test_branch_external_python.py @@ -24,7 +24,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class Test_BranchExternalPythonDecoratedOperator: diff --git a/tests/decorators/test_branch_python.py b/tests/decorators/test_branch_python.py index 58bb2162460498..3cd95b8d2a4ad1 100644 --- a/tests/decorators/test_branch_python.py +++ b/tests/decorators/test_branch_python.py @@ -22,7 +22,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class Test_BranchPythonDecoratedOperator: diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py index a5c23de392de39..6cdfa1ddff25e5 100644 --- a/tests/decorators/test_branch_virtualenv.py +++ b/tests/decorators/test_branch_virtualenv.py @@ -22,7 +22,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class TestBranchPythonVirtualenvDecoratedOperator: diff --git a/tests/decorators/test_condition.py b/tests/decorators/test_condition.py index 315db6bfe0d128..28e0f0bf8fee0a 100644 --- a/tests/decorators/test_condition.py +++ b/tests/decorators/test_condition.py @@ -28,7 +28,8 @@ from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context -pytestmark = pytest.mark.db_test +# TODO(potiuk) see why this test hangs in DB isolation mode +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 5ed5874e3a55a2..0d9a439aa23719 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -29,7 +29,7 @@ from airflow.decorators import setup, task, teardown from airflow.utils import timezone -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] DEFAULT_DATE = timezone.datetime(2016, 1, 1) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 60d18fbddc601e..c83ee6b4c1a46c 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -41,7 +41,7 @@ from airflow.utils.xcom import XCOM_RETURN_KEY from tests.operators.test_python import BasePythonTest -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] if TYPE_CHECKING: @@ -281,6 +281,8 @@ class Test: def add_number(self, num: int) -> int: return self.num + num + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_fail_multiple_outputs_key_type(self): @task_decorator(multiple_outputs=True) def add_number(num: int): @@ -293,6 +295,8 @@ def add_number(num: int): with pytest.raises(AirflowException): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_fail_multiple_outputs_no_dict(self): @task_decorator(multiple_outputs=True) def add_number(num: int): @@ -541,6 +545,8 @@ def add_2(number: int): assert "add_2" in self.dag_non_serialized.task_ids + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_dag_task_multiple_outputs(self): """Tests dag.task property to generate task with multiple outputs""" @@ -863,6 +869,7 @@ def org_test_func(): assert decorated_test_func.__wrapped__ is org_test_func, "__wrapped__ attr is not the original function" +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_upstream_exception_produces_none_xcom(dag_maker, session): from airflow.exceptions import AirflowSkipException @@ -900,6 +907,7 @@ def down(a, b): assert result == "'example' None" +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @pytest.mark.parametrize("multiple_outputs", [True, False]) def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker, session, multiple_outputs): @@ -958,6 +966,7 @@ def other(x): ... assert caplog.messages == [] +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_task_decorator_dataset(dag_maker, session): from airflow.datasets import Dataset diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 554b33ceb9b77b..57a096ef192c7c 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -30,7 +30,7 @@ from airflow.utils import timezone from airflow.utils.state import TaskInstanceState -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] DEFAULT_DATE = timezone.datetime(2016, 1, 1) PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" @@ -373,6 +373,8 @@ def f(): assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_invalid_annotation(self, dag_maker): import uuid diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py index e970894a38ed82..a283ed871ba1d1 100644 --- a/tests/decorators/test_sensor.py +++ b/tests/decorators/test_sensor.py @@ -26,7 +26,7 @@ from airflow.sensors.base import PokeReturnValue from airflow.utils.state import State -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] class TestSensorDecorator: @@ -52,6 +52,7 @@ def dummy_f(): sf >> df dr = dag_maker.create_dagrun() + sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) tis = dr.get_task_instances() assert len(tis) == 2 diff --git a/tests/decorators/test_short_circuit.py b/tests/decorators/test_short_circuit.py index 1d43de68421f9f..1c8349b6c9c86a 100644 --- a/tests/decorators/test_short_circuit.py +++ b/tests/decorators/test_short_circuit.py @@ -24,7 +24,7 @@ from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] DEFAULT_DATE = datetime(2022, 8, 17) diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 107adcc12c7d3b..2d35fc98840817 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -60,7 +60,6 @@ _PythonVersionInfo, get_current_context, ) -from airflow.settings import _ENABLE_AIP_44 from airflow.utils import timezone from airflow.utils.context import AirflowContextDeprecationWarning, Context from airflow.utils.pydantic import is_pydantic_2_installed @@ -89,9 +88,7 @@ CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, reason="`cloudpickle` is not installed") HAS_PYDANTIC_2 = is_pydantic_2_installed() -USE_AIRFLOW_CONTEXT_MARKER = pytest.mark.skipif( - not HAS_PYDANTIC_2 or not _ENABLE_AIP_44, reason="`pydantic<2` or AIP-44 is not enabled" -) +USE_AIRFLOW_CONTEXT_MARKER = pytest.mark.skipif(not HAS_PYDANTIC_2, reason="`pydantic<2`") class BasePythonTest: @@ -1017,6 +1014,7 @@ def f(): assert task.execute_callable() == "EFGHI" @USE_AIRFLOW_CONTEXT_MARKER + @pytest.mark.skip_if_database_isolation_mode def test_current_context(self): def f(): from airflow.operators.python import get_current_context @@ -1068,6 +1066,7 @@ def f(): ) @USE_AIRFLOW_CONTEXT_MARKER + @pytest.mark.skip_if_database_isolation_mode def test_use_airflow_context_touch_other_variables(self): def f(): from airflow.operators.python import get_current_context @@ -1093,19 +1092,7 @@ def f(): get_current_context() return [] - error_msg = "`get_current_context()` needs to be used with Pydantic 2 and AIP-44 enabled." - with pytest.raises(AirflowException, match=re.escape(error_msg)): - self.run_as_task(f, return_ti=True, multiple_outputs=False, use_airflow_context=True) - - @pytest.mark.skipif(_ENABLE_AIP_44, reason="AIP-44 is enabled") - def test_use_airflow_context_without_aip_44_error(self): - def f(): - from airflow.operators.python import get_current_context - - get_current_context() - return [] - - error_msg = "`get_current_context()` needs to be used with Pydantic 2 and AIP-44 enabled." + error_msg = "`get_current_context()` needs to be used with Pydantic 2." with pytest.raises(AirflowException, match=re.escape(error_msg)): self.run_as_task(f, return_ti=True, multiple_outputs=False, use_airflow_context=True) @@ -1410,6 +1397,9 @@ def f(a): "AssertRewritingHook including captured stdout and we need to run " "it with `--assert=plain` pytest option and PYTEST_PLAIN_ASSERTS=true .", ) + # TODO(potiuk) check if this can be fixed in the future - for now we are skipping tests with venv + # and airflow context in DB isolation mode as they are passing None as DAG. + @pytest.mark.skip_if_database_isolation_mode def test_airflow_context(self, serializer): def f( # basic @@ -1531,6 +1521,7 @@ def f( self.run_as_task(f, serializer=serializer, system_site_packages=False, requirements=None) @USE_AIRFLOW_CONTEXT_MARKER + @pytest.mark.skip_if_database_isolation_mode def test_current_context_system_site_packages(self, session): def f(): from airflow.operators.python import get_current_context @@ -1874,6 +1865,7 @@ def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): return kwargs @USE_AIRFLOW_CONTEXT_MARKER + @pytest.mark.skip_if_database_isolation_mode def test_current_context_system_site_packages(self, session): def f(): from airflow.operators.python import get_current_context diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 5e45351f3dbebd..9808944c9e9652 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -398,6 +398,8 @@ def timetable_plugin(monkeypatch): ) +# TODO: (potiuk) - AIP-44 - check why this test hangs +@pytest.mark.skip_if_database_isolation_mode class TestStringifiedDAGs: """Unit tests for stringified DAGs.""" diff --git a/tests/serialization/test_pydantic_models.py b/tests/serialization/test_pydantic_models.py index a71fbf54397108..2b872df5999c4c 100644 --- a/tests/serialization/test_pydantic_models.py +++ b/tests/serialization/test_pydantic_models.py @@ -40,7 +40,7 @@ from airflow.serialization.pydantic.job import JobPydantic from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic from airflow.serialization.serialized_objects import BaseSerialization -from airflow.settings import _ENABLE_AIP_44, TracebackSessionForTests +from airflow.settings import TracebackSessionForTests from airflow.utils import timezone from airflow.utils.state import State from airflow.utils.types import AttributeRemoved, DagRunType @@ -51,7 +51,6 @@ pytest.importorskip("pydantic", minversion="2.0.0") -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") def test_serializing_pydantic_task_instance(session, create_task_instance): dag_id = "test-dag" ti = create_task_instance(dag_id=dag_id, session=session) @@ -72,7 +71,6 @@ def test_serializing_pydantic_task_instance(session, create_task_instance): assert deserialized_model.next_kwargs == {"foo": "bar"} -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") def test_deserialize_ti_mapped_op_reserialized_with_refresh_from_task(session, dag_maker): op_class_dict_expected = { "_needs_expansion": True, @@ -148,7 +146,6 @@ def target(val=None): assert desered.task.downstream_task_ids == set() -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") def test_serializing_pydantic_dagrun(session, create_task_instance): dag_id = "test-dag" ti = create_task_instance(dag_id=dag_id, session=session) @@ -213,7 +210,6 @@ def test_serializing_pydantic_local_task_job(session, create_task_instance): # This test should not be run in DB isolation mode as it accesses the database directly - deliberately @pytest.mark.skip_if_database_isolation_mode -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") def test_serializing_pydantic_dataset_event(session, create_task_instance, create_dummy_dag): ds1 = DatasetModel(id=1, uri="one", extra={"foo": "bar"}) ds2 = DatasetModel(id=2, uri="two") diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index 661ecbf5dcb7a9..ee6b3d48d51a9a 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -57,7 +57,6 @@ from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic from airflow.serialization.pydantic.tasklog import LogTemplatePydantic from airflow.serialization.serialized_objects import BaseSerialization -from airflow.settings import _ENABLE_AIP_44 from airflow.triggers.base import BaseTrigger from airflow.utils import timezone from airflow.utils.context import OutletEventAccessor, OutletEventAccessors @@ -333,7 +332,6 @@ def test_backcompat_deserialize_connection(conn_uri): } -@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") @pytest.mark.parametrize( "input, pydantic_class, encoded_type, cmp_func", [ @@ -416,8 +414,6 @@ def test_serialize_deserialize_pydantic(input, pydantic_class, encoded_type, cmp def test_all_pydantic_models_round_trip(): pytest.importorskip("pydantic", minversion="2.0.0") - if not _ENABLE_AIP_44: - pytest.skip("AIP-44 is disabled") classes = set() mods_folder = REPO_ROOT / "airflow/serialization/pydantic" for p in mods_folder.iterdir():