diff --git a/Dockerfile.ci b/Dockerfile.ci index bed813a8dde635..8040679332f650 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -905,8 +905,7 @@ function environment_initialization() { if [[ ${DATABASE_ISOLATION=} == "true" ]]; then echo "${COLOR_BLUE}Force database isolation configuration:${COLOR_RESET}" export AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION=True - export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:8080 - export AIRFLOW__WEBSERVER_RUN_INTERNAL_API=True + export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:9080 fi RUN_TESTS=${RUN_TESTS:="false"} diff --git a/airflow/__main__.py b/airflow/__main__.py index bfebc63946ef66..82a866c42a4781 100644 --- a/airflow/__main__.py +++ b/airflow/__main__.py @@ -55,6 +55,22 @@ def main(): conf = write_default_airflow_configuration_if_needed() if args.subcommand in ["webserver", "internal-api", "worker"]: write_webserver_configuration_if_needed(conf) + if conf.getboolean("core", "database_access_isolation", fallback=False): + if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]: + # Untrusted components + if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ: + # make sure that the DB is not available for the components that should not access it + del os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] + conf.set("database", "sql_alchemy_conn", "none://") + from airflow.settings import force_traceback_session_for_untrusted_components + + force_traceback_session_for_untrusted_components() + else: + # Trusted components + from airflow.api_internal.internal_api_call import InternalApiConfig + + InternalApiConfig.force_database_direct_access("Running " + args.subcommand + " command") + args.func(args) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index d2b952c5b716d6..8c8fac4de30275 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -120,7 +120,6 @@ def _initialize_map() -> dict[str, Callable]: TaskInstance._get_dagrun, TaskInstance._set_state, TaskInstance.save_to_db, - TaskInstance._schedule_downstream_tasks, TaskInstance._clear_xcom_data, Trigger.from_object, Trigger.bulk_fetch, diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index c3a67d03ee18c1..fc2314578fbd50 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -46,15 +46,17 @@ class InternalApiConfig: _internal_api_endpoint = "" @staticmethod - def force_database_direct_access(): + def force_database_direct_access(message: str): """ Block current component from using Internal API. - All methods decorated with internal_api_call will always be executed locally. - This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server. + All methods decorated with internal_api_call will always be executed locally.` + This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server """ InternalApiConfig._initialized = True InternalApiConfig._use_internal_api = False + if _ENABLE_AIP_44: + logger.info("Forcing database direct access. %s", message) @staticmethod def get_use_internal_api(): diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py index 8c25d1fa5ae588..4d377d56e09727 100644 --- a/airflow/cli/commands/internal_api_command.py +++ b/airflow/cli/commands/internal_api_command.py @@ -222,7 +222,7 @@ def create_app(config=None, testing=False): if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config: flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args() - InternalApiConfig.force_database_direct_access() + InternalApiConfig.force_database_direct_access("Gunicorn worker initialization") csrf = CSRFProtect() csrf.init_app(flask_app) diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index 2f97d3cae3251b..96cfe1e2852f54 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -24,7 +24,6 @@ from multiprocessing import Process from airflow import settings -from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf from airflow.executors.executor_loader import ExecutorLoader @@ -44,7 +43,6 @@ def _run_scheduler_job(args) -> None: job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle ) ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__) - InternalApiConfig.force_database_direct_access() enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK") with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check): run_job(job=job_runner.job, execute_callable=job_runner._execute) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 91f64c7cf4cbb6..6e0fc80fbb300d 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -466,13 +466,14 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None: log.info("Running %s on host %s", ti, hostname) - # IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave - # behind multiple open sleeping connections while heartbeating, which could - # easily exceed the database connection limit when - # processing hundreds of simultaneous tasks. - # this should be last thing before running, to reduce likelihood of an open session - # which can cause trouble if running process in a fork. - settings.reconfigure_orm(disable_connection_pool=True) + if not InternalApiConfig.get_use_internal_api(): + # IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave + # behind multiple open sleeping connections while heartbeating, which could + # easily exceed the database connection limit when + # processing hundreds of simultaneous tasks. + # this should be last thing before running, to reduce likelihood of an open session + # which can cause trouble if running process in a fork. + settings.reconfigure_orm(disable_connection_pool=True) task_return_code = None try: if args.interactive: diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 27eb5c26c2fc42..393da63c74a8d6 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -3759,7 +3759,6 @@ def ti_selector_condition(cls, vals: Collection[str | tuple[str, int]]) -> Colum return or_(*filters) @classmethod - @internal_api_call @provide_session def _schedule_downstream_tasks( cls, diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py b/airflow/providers/fab/auth_manager/security_manager/override.py index bc9a03966bb057..fd0dce0b625b60 100644 --- a/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/airflow/providers/fab/auth_manager/security_manager/override.py @@ -949,8 +949,8 @@ def create_db(self): self.add_role(self.auth_role_public) if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin: log.warning(const.LOGMSG_WAR_SEC_NO_USER) - except Exception as e: - log.error(const.LOGMSG_ERR_SEC_CREATE_DB, e) + except Exception: + log.exception(const.LOGMSG_ERR_SEC_CREATE_DB) exit(1) def get_readable_dags(self, user) -> Iterable[DagModel]: diff --git a/airflow/serialization/pydantic/taskinstance.py b/airflow/serialization/pydantic/taskinstance.py index 829594dc70840c..2af5dcbecaf112 100644 --- a/airflow/serialization/pydantic/taskinstance.py +++ b/airflow/serialization/pydantic/taskinstance.py @@ -458,9 +458,9 @@ def schedule_downstream_tasks(self, session: Session | None = None, max_tis_per_ :meta: private """ - return TaskInstance._schedule_downstream_tasks( - ti=self, session=session, max_tis_per_query=max_tis_per_query - ) + # we should not schedule downstream tasks with Pydantic model because it will not be able to + # get the DAG object (we do not serialize it currently). + return def command_as_list( self, diff --git a/airflow/settings.py b/airflow/settings.py index dbe68bcb1356a2..5e797b268a47e2 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -314,13 +314,7 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global Session global engine - from airflow.api_internal.internal_api_call import InternalApiConfig - - if InternalApiConfig.get_use_internal_api(): - Session = TracebackSession - engine = None - return - elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": + if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": # Skip DB initialization in unit tests, if DB tests are skipped Session = SkipDBTestsSession engine = None @@ -349,6 +343,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None): ) +def force_traceback_session_for_untrusted_components(): + global Session + global engine + dispose_orm() + Session = TracebackSession + engine = None + + DEFAULT_ENGINE_ARGS = { "postgresql": { "executemany_mode": "values_plus_batch", diff --git a/airflow/utils/types.py b/airflow/utils/types.py index e4c5c511acb9d8..2dce1cfdb3fa25 100644 --- a/airflow/utils/types.py +++ b/airflow/utils/types.py @@ -54,7 +54,7 @@ class AttributeRemoved: """ def __getattr__(self, item): - raise RuntimeError("Attribute was removed on serialization and must be set again.") + raise RuntimeError(f"Attribute was removed on serialization and must be set again: {item}.") ATTRIBUTE_REMOVED = AttributeRemoved() diff --git a/airflow/www/app.py b/airflow/www/app.py index 78d51c64a6ab11..c496e314dc458f 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -137,7 +137,7 @@ def create_app(config=None, testing=False): flask_app.json_provider_class = AirflowJsonProvider flask_app.json = AirflowJsonProvider(flask_app) - InternalApiConfig.force_database_direct_access() + InternalApiConfig.force_database_direct_access("Gunicorn worker initialization") csrf.init_app(flask_app) diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index b64a43fcfda4a7..e5b0a23c6e39ce 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -123,8 +123,7 @@ function environment_initialization() { if [[ ${DATABASE_ISOLATION=} == "true" ]]; then echo "${COLOR_BLUE}Force database isolation configuration:${COLOR_RESET}" export AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION=True - export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:8080 - export AIRFLOW__WEBSERVER_RUN_INTERNAL_API=True + export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:9080 fi RUN_TESTS=${RUN_TESTS:="false"} diff --git a/tests/api_internal/test_internal_api_call.py b/tests/api_internal/test_internal_api_call.py index c2adcc3df96284..9ac061181fef75 100644 --- a/tests/api_internal/test_internal_api_call.py +++ b/tests/api_internal/test_internal_api_call.py @@ -72,7 +72,7 @@ def test_get_use_internal_api_enabled(self): } ) def test_force_database_direct_access(self): - InternalApiConfig.force_database_direct_access() + InternalApiConfig.force_database_direct_access("message") assert InternalApiConfig.get_use_internal_api() is False