Skip to content

Commit

Permalink
Remove Database Isolation feature flag and run DB isolation tests
Browse files Browse the repository at this point in the history
This PR removes AIP-44 feature flag and replaces "in-progress-disabled"
test with dedicated "DatabaseIsolation" one.

The DatabaseIsolation test will run all "db-tests" with enabled
DB isolation mode and running `internal-api` component - groups
of tests marked with "skip-if-database-isolation" will be skipped.
  • Loading branch information
potiuk committed Jul 27, 2024
1 parent 4e5ad66 commit d2fc06f
Show file tree
Hide file tree
Showing 22 changed files with 53 additions and 111 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run only latest version checks (true/false)"
required: true
type: string
enable-aip-44:
description: "Whether to enable AIP-44 (true/false)"
required: true
type: string
env:
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
jobs:
run-breeze-tests:
timeout-minutes: 10
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ jobs:
skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
canary-run: ${{needs.build-info.outputs.canary-run}}
latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
enable-aip-44: "false"

build-ci-images:
name: >
Expand Down
6 changes: 2 additions & 4 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ on: # yamllint disable-line rule:truthy
required: false
default: "false"
type: string
enable-aip-44:
description: "Whether to enable AIP-44 or not (true/false)"
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "true"
type: string
Expand Down Expand Up @@ -129,8 +129,6 @@ jobs:
backend-version: "${{fromJSON(inputs.backend-versions)}}"
exclude: "${{fromJSON(inputs.excludes)}}"
env:
# yamllint disable rule:line-length
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
BACKEND: "${{ inputs.backend }}"
BACKEND_VERSION: "${{ matrix.backend-version }}"
DB_RESET: "true"
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,18 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-in-progress-disabled:
name: "In progress disabled test"
tests-database-isolation:
name: "Database isolatoin test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "false"
test-name: "InProgressDisabled-Postgres"
test-scope: "All"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
Expand Down
1 change: 0 additions & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def initialize_method_map() -> dict[str, Callable]:
DagRun.get_previous_scheduled_dagrun,
DagRun.fetch_task_instance,
DagRun._get_log_template,
DagRun._get_task_instances,
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
SerializedDagModel.get_serialized_dag,
SerializedDagModel.remove_deleted_dags,
Expand Down
6 changes: 1 addition & 5 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
from airflow.settings import force_traceback_session_for_untrusted_components
from airflow.typing_compat import ParamSpec
from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -55,8 +55,6 @@ def set_use_database_access(component: str):
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._use_internal_api = False
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
logger.info(
"DB isolation mode. But this is a trusted component and DB connection is set. "
"Using database direct access when running %s.",
Expand All @@ -65,8 +63,6 @@ def set_use_database_access(component: str):

@staticmethod
def set_use_internal_api(component: str, allow_tests_to_use_db: bool = False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
internal_api_url = conf.get("core", "internal_api_url")
url_conf = urlparse(internal_api_url)
api_path = url_conf.path
Expand Down
51 changes: 23 additions & 28 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.state import DagRunState, JobState
Expand Down Expand Up @@ -2096,34 +2095,30 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
args=(),
),
]

if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
)
),
]


def _remove_dag_id_opt(command: ActionCommand):
Expand Down
16 changes: 3 additions & 13 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from airflow.serialization.pydantic.job import JobPydantic
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
from airflow.settings import _ENABLE_AIP_44, DAGS_FOLDER, json
from airflow.settings import DAGS_FOLDER, json
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
Expand Down Expand Up @@ -615,11 +615,6 @@ def serialize(
:meta private:
"""
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(var):
# enum.IntEnum is an int instance, it causes json dumps error so we use its value.
if isinstance(var, enum.Enum):
Expand Down Expand Up @@ -732,7 +727,7 @@ def serialize(
obj = cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
d[str(k)] = obj
return cls._encode(d, type_=DAT.TASK_CONTEXT)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:

def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> dict[str, Any]:
return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined]
Expand Down Expand Up @@ -764,11 +759,6 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
:meta private:
"""
# JSON primitives (except for dict) are not encoded.
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(encoded_var):
return encoded_var
elif isinstance(encoded_var, list):
Expand Down Expand Up @@ -849,7 +839,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return DagCallbackRequest.from_json(var)
elif type_ == DAT.SLA_CALLBACK_REQUEST:
return SlaCallbackRequest.from_json(var)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:
return _type_to_class[type_][0].model_validate(var)
elif type_ == DAT.ARG_NOT_SET:
return NOTSET
Expand Down
10 changes: 0 additions & 10 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,3 @@ def is_usage_data_collection_enabled() -> bool:
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")

# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
"true",
"t",
"yes",
"y",
"1",
}
3 changes: 0 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
from airflow.models import import_all_models
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.json import AirflowJsonProvider
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
Expand Down Expand Up @@ -172,8 +171,6 @@ def create_app(config=None, testing=False):
init_error_handlers(flask_app)
init_api_connexion(flask_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
init_api_internal(flask_app)
init_api_experimental(flask_app)
init_api_auth_provider(flask_app)
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def _run_test(
skip_provider_tests=shell_params.skip_provider_tests,
skip_db_tests=shell_params.skip_db_tests,
run_db_tests_only=shell_params.run_db_tests_only,
run_database_isolation_tests_only=shell_params.database_isolation,
backend=shell_params.backend,
use_xdist=shell_params.use_xdist,
enable_coverage=shell_params.enable_coverage,
Expand Down Expand Up @@ -958,6 +959,7 @@ def helm_tests(
skip_provider_tests=True,
skip_db_tests=False,
run_db_tests_only=False,
run_database_isolation_tests_only=False,
backend="none",
use_xdist=use_xdist,
enable_coverage=False,
Expand Down
1 change: 0 additions & 1 deletion dev/breeze/src/airflow_breeze/params/shell_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ def env_variables_for_docker_commands(self) -> dict[str, str]:
_set_var(_env, "AIRFLOW_CONSTRAINTS_LOCATION", self.airflow_constraints_location)
_set_var(_env, "AIRFLOW_CONSTRAINTS_MODE", self.airflow_constraints_mode)
_set_var(_env, "AIRFLOW_CONSTRAINTS_REFERENCE", self.airflow_constraints_reference)
_set_var(_env, "AIRFLOW_ENABLE_AIP_44", None, "true")
_set_var(_env, "AIRFLOW_ENV", "development")
_set_var(_env, "AIRFLOW_EXTRAS", self.airflow_extras)
_set_var(_env, "AIRFLOW_SKIP_CONSTRAINTS", self.airflow_skip_constraints)
Expand Down
3 changes: 3 additions & 0 deletions dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def generate_args_for_pytest(
skip_provider_tests: bool,
skip_db_tests: bool,
run_db_tests_only: bool,
run_database_isolation_tests_only: bool,
backend: str,
use_xdist: bool,
enable_coverage: bool,
Expand Down Expand Up @@ -360,6 +361,8 @@ def generate_args_for_pytest(
args.append("--skip-db-tests")
if run_db_tests_only:
args.append("--run-db-tests-only")
if run_database_isolation_tests_only:
args.append("--run-database-isolation-tests-only")
if test_type != "System":
args.append(f"--ignore={SYSTEM_TESTS}")
if test_type != "Integration":
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/docker-compose/devcontainer.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
HOME=
AIRFLOW_CI_IMAGE="ghcr.io/apache/airflow/main/ci/python3.8:latest"
ANSWER=
AIRFLOW_ENABLE_AIP_44="true"
AIRFLOW_ENV="development"
PYTHON_MAJOR_MINOR_VERSION="3.8"
AIRFLOW_EXTRAS=
Expand Down
2 changes: 0 additions & 2 deletions tests/api_internal/endpoints/test_rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from airflow.operators.empty import EmptyOperator
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.state import State
from airflow.www import app
Expand Down Expand Up @@ -69,7 +68,6 @@ def equals(a, b) -> bool:
return a == b


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestRpcApiEndpoint:
@pytest.fixture(autouse=True)
def setup_attrs(self, minimal_app_for_internal_api: Flask) -> Generator:
Expand Down
3 changes: 0 additions & 3 deletions tests/api_internal/test_internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.serialization.serialized_objects import BaseSerialization
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.state import State
from tests.test_utils.config import conf_vars

Expand Down Expand Up @@ -61,7 +60,6 @@ def reset_init_api_config():
settings.SQL_ALCHEMY_CONN = old_conn


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiConfig:
@conf_vars(
{
Expand Down Expand Up @@ -97,7 +95,6 @@ def test_force_database_direct_access(self):
assert InternalApiConfig.get_use_internal_api() is False


@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiCall:
@staticmethod
@internal_api_call
Expand Down
2 changes: 0 additions & 2 deletions tests/cli/commands/test_internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from airflow.cli import cli_parser
from airflow.cli.commands import internal_api_command
from airflow.cli.commands.internal_api_command import GunicornMonitor
from airflow.settings import _ENABLE_AIP_44
from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass
from tests.test_utils.config import conf_vars

Expand Down Expand Up @@ -84,7 +83,6 @@ def test_ready_prefix_on_cmdline_dead_process(self):


@pytest.mark.db_test
@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestCliInternalAPI(_ComonCLIGunicornTestClass):
main_process_regexp = r"airflow internal-api"

Expand Down
9 changes: 4 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
os.environ["CREDENTIALS_DIR"] = os.environ.get("CREDENTIALS_DIR") or "/files/airflow-breeze-config/keys"
os.environ["AIRFLOW_ENABLE_AIP_44"] = os.environ.get("AIRFLOW_ENABLE_AIP_44") or "true"

if platform.system() == "Darwin":
# mocks from unittest.mock work correctly in subprocesses only if they are created by "fork" method
Expand Down Expand Up @@ -235,9 +234,9 @@ def set_db_isolation_mode():
InternalApiConfig.set_use_internal_api("tests", allow_tests_to_use_db=True)


def skip_if_database_isolation_mode(item):
def skip_if_database_isolation(item):
if os.environ.get("RUN_TESTS_WITH_DATABASE_ISOLATION", "false").lower() == "true":
for _ in item.iter_markers(name="skip_if_database_isolation_mode"):
for _ in item.iter_markers(name="skip_if_database_isolation"):
pytest.skip("This test is skipped because it is not allowed in database isolation mode.")


Expand Down Expand Up @@ -464,7 +463,7 @@ def pytest_configure(config: pytest.Config) -> None:
"external_python_operator: external python operator tests are 'long', we should run them separately",
)
config.addinivalue_line("markers", "enable_redact: do not mock redact secret masker")
config.addinivalue_line("markers", "skip_if_database_isolation_mode: skip if DB isolation is enabled")
config.addinivalue_line("markers", "skip_if_database_isolation: skip if DB isolation is enabled")

os.environ["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"

Expand Down Expand Up @@ -704,7 +703,7 @@ def pytest_runtest_setup(item):
skip_if_platform_doesnt_match(marker)
for marker in item.iter_markers(name="backend"):
skip_if_wrong_backend(marker, item)
skip_if_database_isolation_mode(item)
skip_if_database_isolation(item)
selected_backend = item.config.option.backend
if selected_backend:
skip_if_not_marked_with_backend(selected_backend, item)
Expand Down
Loading

0 comments on commit d2fc06f

Please sign in to comment.