Skip to content

Commit

Permalink
AIP-44 make database isolation mode work in Breeze (#40894)
Browse files Browse the repository at this point in the history
With this PR, it is possible to get a working "DB isolation" working
solution with celery executor. From my tests it works comparably
fast to the non-DB isolation executor.

Things changed here:

* remove "schedule_downstream_tasks" endpoint. It is currently
  not possible to get it as DAG object is removed during
  serialization and this is where this method calculates which
  tasks to schedule

* when we are forcing DB access in DB isolation mode, we print
  log message that we are switching to using DB for appropriate
  components. We also make sure to remove DB configuration just
  in case it is set (this allows to run tests in breeze environment
  with more certainty)

* the detection whether to force direct DB access is made in
  _main - this way regular commands run in breeze (migrate/user
  etc. can use the DB while intializing the environment and actions
  can be logged to DB or via RPC calls.

* improved diagnostics

Co-authored-by: Vincent <[email protected]>
  • Loading branch information
potiuk and vincbeck authored Jul 20, 2024
1 parent f099bea commit 6684481
Show file tree
Hide file tree
Showing 15 changed files with 49 additions and 34 deletions.
3 changes: 1 addition & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -905,8 +905,7 @@ function environment_initialization() {
if [[ ${DATABASE_ISOLATION=} == "true" ]]; then
echo "${COLOR_BLUE}Force database isolation configuration:${COLOR_RESET}"
export AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION=True
export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:8080
export AIRFLOW__WEBSERVER_RUN_INTERNAL_API=True
export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:9080
fi

RUN_TESTS=${RUN_TESTS:="false"}
Expand Down
16 changes: 16 additions & 0 deletions airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ def main():
conf = write_default_airflow_configuration_if_needed()
if args.subcommand in ["webserver", "internal-api", "worker"]:
write_webserver_configuration_if_needed(conf)
if conf.getboolean("core", "database_access_isolation", fallback=False):
if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]:
# Untrusted components
if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
# make sure that the DB is not available for the components that should not access it
del os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"]
conf.set("database", "sql_alchemy_conn", "none://")
from airflow.settings import force_traceback_session_for_untrusted_components

force_traceback_session_for_untrusted_components()
else:
# Trusted components
from airflow.api_internal.internal_api_call import InternalApiConfig

InternalApiConfig.force_database_direct_access("Running " + args.subcommand + " command")

args.func(args)


Expand Down
1 change: 0 additions & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def _initialize_map() -> dict[str, Callable]:
TaskInstance._get_dagrun,
TaskInstance._set_state,
TaskInstance.save_to_db,
TaskInstance._schedule_downstream_tasks,
TaskInstance._clear_xcom_data,
Trigger.from_object,
Trigger.bulk_fetch,
Expand Down
8 changes: 5 additions & 3 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ class InternalApiConfig:
_internal_api_endpoint = ""

@staticmethod
def force_database_direct_access():
def force_database_direct_access(message: str):
"""
Block current component from using Internal API.
All methods decorated with internal_api_call will always be executed locally.
This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server.
All methods decorated with internal_api_call will always be executed locally.`
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = False
if _ENABLE_AIP_44:
logger.info("Forcing database direct access. %s", message)

@staticmethod
def get_use_internal_api():
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def create_app(config=None, testing=False):
if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

InternalApiConfig.force_database_direct_access()
InternalApiConfig.force_database_direct_access("Gunicorn worker initialization")

csrf = CSRFProtect()
csrf.init_app(flask_app)
Expand Down
2 changes: 0 additions & 2 deletions airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from multiprocessing import Process

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.executors.executor_loader import ExecutorLoader
Expand All @@ -44,7 +43,6 @@ def _run_scheduler_job(args) -> None:
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
run_job(job=job_runner.job, execute_callable=job_runner._execute)
Expand Down
15 changes: 8 additions & 7 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,14 @@ def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:

log.info("Running %s on host %s", ti, hostname)

# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)
if not InternalApiConfig.get_use_internal_api():
# IMPORTANT, have to re-configure ORM with the NullPool, otherwise, each "run" command may leave
# behind multiple open sleeping connections while heartbeating, which could
# easily exceed the database connection limit when
# processing hundreds of simultaneous tasks.
# this should be last thing before running, to reduce likelihood of an open session
# which can cause trouble if running process in a fork.
settings.reconfigure_orm(disable_connection_pool=True)
task_return_code = None
try:
if args.interactive:
Expand Down
1 change: 0 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3759,7 +3759,6 @@ def ti_selector_condition(cls, vals: Collection[str | tuple[str, int]]) -> Colum
return or_(*filters)

@classmethod
@internal_api_call
@provide_session
def _schedule_downstream_tasks(
cls,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,8 @@ def create_db(self):
self.add_role(self.auth_role_public)
if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin:
log.warning(const.LOGMSG_WAR_SEC_NO_USER)
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_CREATE_DB, e)
except Exception:
log.exception(const.LOGMSG_ERR_SEC_CREATE_DB)
exit(1)

def get_readable_dags(self, user) -> Iterable[DagModel]:
Expand Down
6 changes: 3 additions & 3 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,9 @@ def schedule_downstream_tasks(self, session: Session | None = None, max_tis_per_
:meta: private
"""
return TaskInstance._schedule_downstream_tasks(
ti=self, session=session, max_tis_per_query=max_tis_per_query
)
# we should not schedule downstream tasks with Pydantic model because it will not be able to
# get the DAG object (we do not serialize it currently).
return

def command_as_list(
self,
Expand Down
16 changes: 9 additions & 7 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,7 @@ def configure_orm(disable_connection_pool=False, pool_class=None):

global Session
global engine
from airflow.api_internal.internal_api_call import InternalApiConfig

if InternalApiConfig.get_use_internal_api():
Session = TracebackSession
engine = None
return
elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
engine = None
Expand Down Expand Up @@ -349,6 +343,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
)


def force_traceback_session_for_untrusted_components():
global Session
global engine
dispose_orm()
Session = TracebackSession
engine = None


DEFAULT_ENGINE_ARGS = {
"postgresql": {
"executemany_mode": "values_plus_batch",
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AttributeRemoved:
"""

def __getattr__(self, item):
raise RuntimeError("Attribute was removed on serialization and must be set again.")
raise RuntimeError(f"Attribute was removed on serialization and must be set again: {item}.")


ATTRIBUTE_REMOVED = AttributeRemoved()
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create_app(config=None, testing=False):
flask_app.json_provider_class = AirflowJsonProvider
flask_app.json = AirflowJsonProvider(flask_app)

InternalApiConfig.force_database_direct_access()
InternalApiConfig.force_database_direct_access("Gunicorn worker initialization")

csrf.init_app(flask_app)

Expand Down
3 changes: 1 addition & 2 deletions scripts/docker/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ function environment_initialization() {
if [[ ${DATABASE_ISOLATION=} == "true" ]]; then
echo "${COLOR_BLUE}Force database isolation configuration:${COLOR_RESET}"
export AIRFLOW__CORE__DATABASE_ACCESS_ISOLATION=True
export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:8080
export AIRFLOW__WEBSERVER_RUN_INTERNAL_API=True
export AIRFLOW__CORE__INTERNAL_API_URL=http://localhost:9080
fi

RUN_TESTS=${RUN_TESTS:="false"}
Expand Down
2 changes: 1 addition & 1 deletion tests/api_internal/test_internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_get_use_internal_api_enabled(self):
}
)
def test_force_database_direct_access(self):
InternalApiConfig.force_database_direct_access()
InternalApiConfig.force_database_direct_access("message")
assert InternalApiConfig.get_use_internal_api() is False


Expand Down

0 comments on commit 6684481

Please sign in to comment.