Skip to content

Commit

Permalink
Remove Database Isolation feature flag and run DB isolation tests
Browse files Browse the repository at this point in the history
This PR removes AIP-44 feature flag and replaces "in-progress-disabled"
test with dedicated "DatabaseIsolation" one.

The DatabaseIsolation test will run all "db-tests" with enabled
DB isolation mode and running `internal-api` component - groups
of tests marked with "skip-if-database-isolation" will be skipped.
  • Loading branch information
potiuk committed Aug 16, 2024
1 parent 4361596 commit 21b950f
Show file tree
Hide file tree
Showing 36 changed files with 135 additions and 148 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run only latest version checks (true/false)"
required: true
type: string
enable-aip-44:
description: "Whether to enable AIP-44 (true/false)"
required: true
type: string
env:
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
jobs:
run-breeze-tests:
timeout-minutes: 10
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ jobs:
skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
canary-run: ${{needs.build-info.outputs.canary-run}}
latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
enable-aip-44: "false"

build-ci-images:
name: >
Expand Down
9 changes: 4 additions & 5 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ on: # yamllint disable-line rule:truthy
required: false
default: "false"
type: string
enable-aip-44:
description: "Whether to enable AIP-44 or not (true/false)"
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "true"
default: "false"
type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or not (true/false)"
Expand All @@ -129,8 +129,6 @@ jobs:
backend-version: "${{fromJSON(inputs.backend-versions)}}"
exclude: "${{fromJSON(inputs.excludes)}}"
env:
# yamllint disable rule:line-length
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
BACKEND: "${{ inputs.backend }}"
BACKEND_VERSION: "${{ matrix.backend-version }}"
DB_RESET: "true"
Expand All @@ -152,6 +150,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,18 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-in-progress-disabled:
name: "In progress disabled test"
tests-database-isolation:
name: "Database isolation test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "false"
test-name: "InProgressDisabled-Postgres"
test-scope: "All"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.info("Calling method %s\nparams: %s", method_name, params)
log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.info("Sending response: %s", response)
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.info("Sending exception response: %s", response)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
6 changes: 1 addition & 5 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
from airflow.settings import force_traceback_session_for_untrusted_components
from airflow.typing_compat import ParamSpec
from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -55,8 +55,6 @@ def set_use_database_access(component: str):
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._use_internal_api = False
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
logger.info(
"DB isolation mode. But this is a trusted component and DB connection is set. "
"Using database direct access when running %s.",
Expand All @@ -65,8 +63,6 @@ def set_use_database_access(component: str):

@staticmethod
def set_use_internal_api(component: str, allow_tests_to_use_db: bool = False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
internal_api_url = conf.get("core", "internal_api_url")
url_conf = urlparse(internal_api_url)
api_path = url_conf.path
Expand Down
51 changes: 23 additions & 28 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.state import DagRunState, JobState
Expand Down Expand Up @@ -2080,34 +2079,30 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
args=(),
),
]

if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
)
),
]


def _remove_dag_id_opt(command: ActionCommand):
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,10 +1512,11 @@ def run(
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()

session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
Expand Down
5 changes: 2 additions & 3 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
Expand Down Expand Up @@ -552,8 +551,8 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
self._write_args(input_path)
self._write_string_args(string_args_path)

if self.use_airflow_context and (not is_pydantic_2_installed() or not _ENABLE_AIP_44):
error_msg = "`get_current_context()` needs to be used with Pydantic 2 and AIP-44 enabled."
if self.use_airflow_context and not is_pydantic_2_installed():
error_msg = "`get_current_context()` needs to be used with Pydantic 2."
raise AirflowException(error_msg)

jinja_context = {
Expand Down
16 changes: 3 additions & 13 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
from airflow.serialization.pydantic.trigger import TriggerPydantic
from airflow.settings import _ENABLE_AIP_44, DAGS_FOLDER, json
from airflow.settings import DAGS_FOLDER, json
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
Expand Down Expand Up @@ -627,11 +627,6 @@ def serialize(
:meta private:
"""
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(var):
# enum.IntEnum is an int instance, it causes json dumps error so we use its value.
if isinstance(var, enum.Enum):
Expand Down Expand Up @@ -758,7 +753,7 @@ def serialize(
obj = cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
d[str(k)] = obj
return cls._encode(d, type_=DAT.TASK_CONTEXT)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:

def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> dict[str, Any]:
return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined]
Expand Down Expand Up @@ -790,11 +785,6 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
:meta private:
"""
# JSON primitives (except for dict) are not encoded.
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(encoded_var):
return encoded_var
elif isinstance(encoded_var, list):
Expand Down Expand Up @@ -886,7 +876,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return SlaCallbackRequest.from_json(var)
elif type_ == DAT.TASK_INSTANCE_KEY:
return TaskInstanceKey(**var)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:
return _type_to_class[type_][0].model_validate(var)
elif type_ == DAT.ARG_NOT_SET:
return NOTSET
Expand Down
61 changes: 36 additions & 25 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def remove(*args, **kwargs):
AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py")
AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py")
AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py")
AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")


class TracebackSessionForTests:
Expand Down Expand Up @@ -369,6 +371,9 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
:return: True if the object was created from test code, False otherwise.
"""
self.traceback = traceback.extract_stack()
if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback):
# This is a fixture call
return True, None
airflow_frames = [
tb
for tb in self.traceback
Expand All @@ -377,24 +382,30 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
]
if any(
filename.endswith("conftest.py") or filename.endswith("tests/test_utils/db.py")
for filename, _, _, _ in airflow_frames
filename.endswith("conftest.py")
or filename.endswith("tests/test_utils/db.py")
or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method"))
for filename, _, name, _ in airflow_frames
):
# This is a fixture call or testing utilities
return True, None
if (
len(airflow_frames) >= 2
and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH)
and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH
and airflow_frames[-1].name == "run"
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
# Let's look at what we are calling directly from the test code
current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name
if (current_filename, current_method_name) in (
(AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
(AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if current_filename == AIRFLOW_DB_UTILS_PATH:
# This is a util method called directly from the test code
return True, None
for tb in airflow_frames[::-1]:
if tb.filename.startswith(AIRFLOW_PATH):
if tb.filename.startswith(AIRFLOW_TESTS_PATH):
Expand All @@ -406,6 +417,16 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
# The traceback line will be always 3rd (two bottom ones are Airflow)
return False, self.traceback[-2]

def get_bind(
self,
mapper=None,
clause=None,
bind=None,
_sa_skip_events=None,
_sa_skip_for_implicit_returning=False,
):
pass


def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
"""Determine whether the database connection URI specifies a relative path."""
Expand Down Expand Up @@ -858,13 +879,3 @@ def is_usage_data_collection_enabled() -> bool:
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")

# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
"true",
"t",
"yes",
"y",
"1",
}
3 changes: 0 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
from airflow.models import import_all_models
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.json import AirflowJsonProvider
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
Expand Down Expand Up @@ -171,8 +170,6 @@ def create_app(config=None, testing=False):
init_error_handlers(flask_app)
init_api_connexion(flask_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
init_api_internal(flask_app)
init_api_auth_provider(flask_app)
init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first
Expand Down
Loading

0 comments on commit 21b950f

Please sign in to comment.