From b211936a4c3117f8102798d5c460de9235c5c571 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 14 Aug 2024 15:06:46 +0200 Subject: [PATCH 01/17] Enable pull requests to be run from v*test branches (#41474) (#41476) Since we switch from direct push of cherry-picking to open PRs against v*test branch, we should enable PRs to run for the target branch. (cherry picked from commit a9363e6a30d73a647ed7d45c92d46d1f6f98513f) --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 68aa51bf860f5..a19879ea172a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ on: # yamllint disable-line rule:truthy push: branches: ['v[0-9]+-[0-9]+-test'] pull_request: - branches: ['main'] + branches: ['main', 'v[0-9]+-[0-9]+-test'] workflow_dispatch: permissions: # All other permissions are set to none From 59342723770ef3a4d681221d62e19ae88c759542 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 14 Aug 2024 17:01:30 +0200 Subject: [PATCH 02/17] Prevent provider lowest-dependency tests to run in non-main branch (#41478) (#41481) When running tests in v2-10-test branch, lowest depenency tests are run for providers - because when calculating separate tests, the "skip_provider_tests" has not been used to filter them out. This PR fixes it. (cherry picked from commit 75da5074969ec874040ea094d5afe00b7f02be76) --- dev/breeze/src/airflow_breeze/utils/selective_checks.py | 4 ++++ dev/breeze/tests/test_selective_checks.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py b/dev/breeze/src/airflow_breeze/utils/selective_checks.py index 62f5a20abc4e0..224e76c251921 100644 --- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py +++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py @@ -861,6 +861,10 @@ def separate_test_types_list_as_string(self) -> str | None: if "Providers" in current_test_types: current_test_types.remove("Providers") current_test_types.update({f"Providers[{provider}]" for provider in get_available_packages()}) + if self.skip_provider_tests: + current_test_types = { + test_type for test_type in current_test_types if not test_type.startswith("Providers") + } return " ".join(sorted(current_test_types)) @cached_property diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 7c0ca940949cd..6bee6bbc7e308 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -1075,7 +1075,7 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ ), ( pytest.param( - ("INTHEWILD.md",), + ("INTHEWILD.md", "tests/providers/asana.py"), ("full tests needed",), "v2-7-stable", { @@ -1097,6 +1097,9 @@ def test_full_test_needed_when_scripts_changes(files: tuple[str, ...], expected_ "parallel-test-types-list-as-string": "API Always BranchExternalPython " "BranchPythonVenv CLI Core ExternalPython Operators Other PlainAsserts " "PythonVenv Serialization WWW", + "separate-test-types-list-as-string": "API Always BranchExternalPython " + "BranchPythonVenv CLI Core ExternalPython Operators Other PlainAsserts " + "PythonVenv Serialization WWW", "needs-mypy": "true", "mypy-folders": "['airflow', 'docs', 'dev']", }, From bf0d412531e099ecb918beb04287d30b72dc5682 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 14 Aug 2024 19:12:49 +0200 Subject: [PATCH 03/17] Make PROD image building works in non-main PRs (#41480) (#41484) The PROD image building fails currently in non-main because it attempts to build source provider packages rather than use them from PyPi when PR is run against "v-test" branch. This PR fixes it: * PROD images in non-main-targetted build will pull providers from PyPI rather than build them * they use PyPI constraints to install the providers * they use UV - which should speed up building of the images (cherry picked from commit 4d5f1c42a7873329b1b6b8b9b39db2c3033b46df) --- .github/workflows/build-images.yml | 4 ++-- .github/workflows/ci.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index bd10e73aac65c..1256fd2f0da6e 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -241,14 +241,14 @@ jobs: pull-request-target: "true" is-committer-build: ${{ needs.build-info.outputs.is-committer-build }} push-image: "true" - use-uv: ${{ needs.build-info.outputs.default-branch == 'main' && 'true' || 'false' }} + use-uv: "true" image-tag: ${{ needs.build-info.outputs.image-tag }} platform: "linux/amd64" python-versions: ${{ needs.build-info.outputs.python-versions }} default-python-version: ${{ needs.build-info.outputs.default-python-version }} branch: ${{ needs.build-info.outputs.default-branch }} constraints-branch: ${{ needs.build-info.outputs.constraints-branch }} - build-provider-packages: "true" + build-provider-packages: ${{ needs.build-info.outputs.default-branch == 'main' }} upgrade-to-newer-dependencies: ${{ needs.build-info.outputs.upgrade-to-newer-dependencies }} chicken-egg-providers: ${{ needs.build-info.outputs.chicken-egg-providers }} docker-cache: ${{ needs.build-info.outputs.docker-cache }} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a19879ea172a2..4e1db2a14edfc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -541,7 +541,7 @@ jobs: default-python-version: ${{ needs.build-info.outputs.default-python-version }} branch: ${{ needs.build-info.outputs.default-branch }} push-image: "true" - use-uv: ${{ needs.build-info.outputs.default-branch == 'main' && 'true' || 'false' }} + use-uv: "true" build-provider-packages: ${{ needs.build-info.outputs.default-branch == 'main' }} upgrade-to-newer-dependencies: ${{ needs.build-info.outputs.upgrade-to-newer-dependencies }} chicken-egg-providers: ${{ needs.build-info.outputs.chicken-egg-providers }} From 5f8230bbe38a5382ed5c8993731130aedf722e89 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Thu, 15 Aug 2024 10:23:04 -0400 Subject: [PATCH 04/17] Add WebEncoder for trigger page rendering to avoid render failure (#41350) (#41485) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: M. Olcay Tercanlı --- airflow/www/views.py | 3 ++- tests/www/views/test_views_trigger_dag.py | 28 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 236beed4511a3..a485f84ed4b1c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2163,7 +2163,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): .limit(num_recent_confs) ) recent_confs = { - run_id: json.dumps(run_conf) + run_id: json.dumps(run_conf, cls=utils_json.WebEncoder) for run_id, run_conf in ((run.run_id, run.conf) for run in recent_runs) if isinstance(run_conf, dict) and any(run_conf) } @@ -2198,6 +2198,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): }, indent=4, ensure_ascii=False, + cls=utils_json.WebEncoder, ) except TypeError: flash("Could not pre-populate conf field due to non-JSON-serializable data-types") diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py index c53213c3e68ea..9b2b2971982af 100644 --- a/tests/www/views/test_views_trigger_dag.py +++ b/tests/www/views/test_views_trigger_dag.py @@ -19,6 +19,7 @@ import datetime import json +from decimal import Decimal from urllib.parse import quote import pytest @@ -28,6 +29,7 @@ from airflow.operators.empty import EmptyOperator from airflow.security import permissions from airflow.utils import timezone +from airflow.utils.json import WebEncoder from airflow.utils.session import create_session from airflow.utils.types import DagRunType from tests.test_utils.api_connexion_utils import create_test_client @@ -92,6 +94,32 @@ def test_trigger_dag_conf(admin_client): assert run.conf == conf_dict +def test_trigger_dag_conf_serializable_fields(admin_client): + test_dag_id = "example_bash_operator" + time_now = timezone.utcnow() + conf_dict = { + "string": "Hello, World!", + "date_str": "2024-08-08T09:57:35.300858", + "datetime": time_now, + "decimal": Decimal(10.465), + } + expected_conf = { + "string": "Hello, World!", + "date_str": "2024-08-08T09:57:35.300858", + "datetime": time_now.isoformat(), + "decimal": 10.465, + } + + admin_client.post(f"dags/{test_dag_id}/trigger", data={"conf": json.dumps(conf_dict, cls=WebEncoder)}) + + with create_session() as session: + run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first() + assert run is not None + assert DagRunType.MANUAL in run.run_id + assert run.run_type == DagRunType.MANUAL + assert run.conf == expected_conf + + def test_trigger_dag_conf_malformed(admin_client): test_dag_id = "example_bash_operator" From 30cfb7345ae793e3687597e6d499f1657ed8ece3 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 16 Aug 2024 18:10:51 +0200 Subject: [PATCH 05/17] Incorrect try number subtraction producing invalid span id for OTEL airflow (issue #41501) (#41502) (#41535) * Fix for issue #39336 * removed unnecessary import (cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212) Co-authored-by: Howard Yoo <32691630+howardyoo@users.noreply.github.com> --- airflow/executors/base_executor.py | 2 +- airflow/executors/local_executor.py | 2 +- airflow/executors/sequential_executor.py | 2 +- airflow/jobs/scheduler_job_runner.py | 2 +- airflow/traces/utils.py | 7 +------ 5 files changed, 5 insertions(+), 10 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index dd0b8a66d2857..57568af199710 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -467,7 +467,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None: span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) self.change_state(key, TaskInstanceState.SUCCESS, info) diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index afa51b1d86bb4..32bba4208273b 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -277,7 +277,7 @@ def execute_async( span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) span.set_attribute("commands_to_run", str(command)) local_worker = LocalWorker(self.executor.result_queue, key=key, command=command) diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 1b145892ebc7e..5e9542d9158b1 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -76,7 +76,7 @@ def execute_async( span.set_attribute("dag_id", key.dag_id) span.set_attribute("run_id", key.run_id) span.set_attribute("task_id", key.task_id) - span.set_attribute("try_number", key.try_number - 1) + span.set_attribute("try_number", key.try_number) span.set_attribute("commands_to_run", str(self.commands_to_run)) def sync(self) -> None: diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 163bf5b71449b..ba5f90c68b772 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -837,7 +837,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) -> span.set_attribute("hostname", ti.hostname) span.set_attribute("log_url", ti.log_url) span.set_attribute("operator", str(ti.operator)) - span.set_attribute("try_number", ti.try_number - 1) + span.set_attribute("try_number", ti.try_number) span.set_attribute("executor_state", state) span.set_attribute("job_id", ti.job_id) span.set_attribute("pool", ti.pool) diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py index afab2591d5146..9932c249f0772 100644 --- a/airflow/traces/utils.py +++ b/airflow/traces/utils.py @@ -22,7 +22,6 @@ from airflow.traces import NO_TRACE_ID from airflow.utils.hashlib_wrapper import md5 -from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: from airflow.models import DagRun, TaskInstance @@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int: """Generate span id from the task instance.""" dag_run = ti.dag_run - if ti.state == TaskInstanceState.SUCCESS or ti.state == TaskInstanceState.FAILED: - try_number = ti.try_number - 1 - else: - try_number = ti.try_number return _gen_id( - [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)], + [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)], as_int, SPAN_ID, ) From fa4ee68da1f7aea611f4f6820f8b767a31498150 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 16 Aug 2024 20:08:56 +0200 Subject: [PATCH 06/17] Fix failing pydantic v1 tests (#41534) (#41541) We need to exclude some versions of Pydantic v1 because it conflicts with aws provider. (cherry picked from commit a033c5f15a033c751419506ea77ffdbacdd37705) --- Dockerfile.ci | 3 ++- scripts/docker/entrypoint_ci.sh | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile.ci b/Dockerfile.ci index 14ccb669f62aa..06a7343c82e26 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1068,8 +1068,9 @@ function check_pydantic() { echo echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}" echo + # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we need to exclude it # shellcheck disable=SC2086 - ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0" + ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0,!=1.10.17,!=1.10.15" pip check else echo diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index 1ff3ef0cd2cce..da7ff309763aa 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -289,8 +289,9 @@ function check_pydantic() { echo echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}" echo + # Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we need to exclude it # shellcheck disable=SC2086 - ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0" + ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0,!=1.10.17,!=1.10.15" pip check else echo From e36e521dcb317548a159d98587aff18c979419f7 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 16 Aug 2024 21:24:46 +0200 Subject: [PATCH 07/17] Fix Non-DB test calculation for main builds (#41499) (#41543) Pytest has a weird behaviour that it will not collect tests from parent folder when subfolder of it is specified after the parent folder. This caused some non-db tests from providers folder have been skipped during main build. The issue in Pytest 8.2 (used to work before) is tracked at https://github.com/pytest-dev/pytest/issues/12605 (cherry picked from commit d48982692c54d024d7c05e1efb7cd2adeb7d896c) --- dev/breeze/src/airflow_breeze/utils/run_tests.py | 14 +++++++++++--- .../tests/test_pytest_args_for_test_types.py | 13 +++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index fe099efbc024a..375dc568475b4 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -374,7 +374,7 @@ def generate_args_for_pytest( args.extend(get_excluded_provider_args(python_version)) if use_xdist: args.extend(["-n", str(parallelism) if parallelism else "auto"]) - # We have to disabke coverage for Python 3.12 because of the issue with coverage that takes too long, despite + # We have to disable coverage for Python 3.12 because of the issue with coverage that takes too long, despite # Using experimental support for Python 3.12 PEP 669. The coverage.py is not yet fully compatible with the # full scope of PEP-669. That will be fully done when https://github.com/nedbat/coveragepy/issues/1746 is # resolve for now we are disabling coverage for Python 3.12, and it causes slower execution and occasional @@ -417,5 +417,13 @@ def convert_parallel_types_to_folders( python_version=python_version, ) ) - # leave only folders, strip --pytest-args - return [arg for arg in args if arg.startswith("test")] + # leave only folders, strip --pytest-args that exclude some folders with `-' prefix + folders = [arg for arg in args if arg.startswith("test")] + # remove specific provider sub-folders if "tests/providers" is already in the list + # This workarounds pytest issues where it will only run tests from specific subfolders + # if both parent and child folders are in the list + # The issue in Pytest (changed behaviour in Pytest 8.2 is tracked here + # https://github.com/pytest-dev/pytest/issues/12605 + if "tests/providers" in folders: + folders = [folder for folder in folders if not folder.startswith("tests/providers/")] + return folders diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index a64dccbd06f2a..fbb3785949e85 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -329,6 +329,19 @@ def test_pytest_args_for_helm_test_types(helm_test_package: str, pytest_args: li ], True, ), + ( + "Core Providers[-amazon,google] Providers[amazon] Providers[google]", + [ + "tests/core", + "tests/executors", + "tests/jobs", + "tests/models", + "tests/ti_deps", + "tests/utils", + "tests/providers", + ], + False, + ), ], ) def test_folders_for_parallel_test_types( From 3ea764f802fddf16f4a12f39212f067e1abcbbf5 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 19 Aug 2024 16:30:03 +0530 Subject: [PATCH 08/17] Add changelog for airflow python client 2.10.0 (#41583) (#41584) * Add changelog for airflow python client 2.10.0 * Update client version (cherry picked from commit 317a28ed435960e7184e357a2f128806c34612fa) --- clients/python/CHANGELOG.md | 20 ++++++++++++++++++++ clients/python/version.txt | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clients/python/CHANGELOG.md b/clients/python/CHANGELOG.md index d56d692d3c4e2..6a254029a4760 100644 --- a/clients/python/CHANGELOG.md +++ b/clients/python/CHANGELOG.md @@ -17,6 +17,26 @@ under the License. --> +# v2.10.0 + +## Major changes: + + - Add dag_stats rest api endpoint ([#41017](https://github.com/apache/airflow/pull/41017)) + - AIP-64: Add task instance history list endpoint ([#40988](https://github.com/apache/airflow/pull/40988)) + - Change DAG Audit log tab to Event Log ([#40967](https://github.com/apache/airflow/pull/40967)) + - AIP-64: Add REST API endpoints for TI try level details ([#40441](https://github.com/apache/airflow/pull/40441)) + - Make XCom display as react json ([#40640](https://github.com/apache/airflow/pull/40640)) + - Replace usages of task context logger with the log table ([#40867](https://github.com/apache/airflow/pull/40867)) + - Fix tasks API endpoint when DAG doesn't have `start_date` ([#40878](https://github.com/apache/airflow/pull/40878)) + - Add try_number to log table ([#40739](https://github.com/apache/airflow/pull/40739)) + - Add executor field to the task instance API ([#40034](https://github.com/apache/airflow/pull/40034)) + - Add task documentation to details tab in grid view. ([#39899](https://github.com/apache/airflow/pull/39899)) + - Add max_consecutive_failed_dag_runs in API spec ([#39830](https://github.com/apache/airflow/pull/39830)) + - Add task failed dependencies to details page. ([#38449](https://github.com/apache/airflow/pull/38449)) + - Add dag re-parsing request endpoint ([#39138](https://github.com/apache/airflow/pull/39138)) + - Reorder OpenAPI Spec tags alphabetically ([#38717](https://github.com/apache/airflow/pull/38717)) + + # v2.9.1 ## Major changes: diff --git a/clients/python/version.txt b/clients/python/version.txt index dedcc7d4335da..10c2c0c3d6213 100644 --- a/clients/python/version.txt +++ b/clients/python/version.txt @@ -1 +1 @@ -2.9.1 +2.10.0 From 64415639b8b8be00bb737894fdbd1313f7d96680 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 19 Aug 2024 14:03:14 +0200 Subject: [PATCH 09/17] Make all test pass in Database Isolation mode (#41567) This adds dedicated "DatabaseIsolation" test to airflow v2-10-test branch.. The DatabaseIsolation test will run all "db-tests" with enabled DB isolation mode and running `internal-api` component - groups of tests marked with "skip-if-database-isolation" will be skipped. --- .github/workflows/run-unit-tests.yml | 6 +++ .github/workflows/special-tests.yml | 23 +++++++++ .../endpoints/rpc_api_endpoint.py | 6 +-- airflow/models/baseoperator.py | 3 +- airflow/settings.py | 51 +++++++++++++------ .../commands/testing_commands.py | 2 + .../templates/CHANGELOG_TEMPLATE.rst.jinja2 | 6 +-- .../src/airflow_breeze/utils/run_tests.py | 4 +- .../cli/commands/test_internal_api_command.py | 3 ++ tests/cli/commands/test_webserver_command.py | 3 ++ tests/decorators/test_bash.py | 3 ++ .../decorators/test_branch_external_python.py | 3 +- tests/decorators/test_branch_python.py | 3 +- tests/decorators/test_branch_virtualenv.py | 3 +- tests/decorators/test_condition.py | 3 +- tests/decorators/test_external_python.py | 2 +- tests/decorators/test_python.py | 11 +++- tests/decorators/test_python_virtualenv.py | 4 +- tests/decorators/test_sensor.py | 3 +- tests/decorators/test_short_circuit.py | 2 +- tests/models/test_variable.py | 2 +- tests/operators/test_python.py | 3 ++ tests/serialization/test_dag_serialization.py | 2 + 23 files changed, 118 insertions(+), 33 deletions(-) diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 7828e50ed7e95..2989a952d9ea2 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -104,6 +104,11 @@ on: # yamllint disable-line rule:truthy required: false default: "true" type: string + database-isolation: + description: "Whether to enable database isolattion or not (true/false)" + required: false + default: "false" + type: string force-lowest-dependencies: description: "Whether to force lowest dependencies for the tests or not (true/false)" required: false @@ -152,6 +157,7 @@ jobs: PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}" UPGRADE_BOTO: "${{ inputs.upgrade-boto }}" AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}" + DATABASE_ISOLATION: "${{ inputs.database-isolation }}" VERBOSE: "true" steps: - name: "Cleanup repo" diff --git a/.github/workflows/special-tests.yml b/.github/workflows/special-tests.yml index 000b5aa3d958b..e09b813acf916 100644 --- a/.github/workflows/special-tests.yml +++ b/.github/workflows/special-tests.yml @@ -193,6 +193,29 @@ jobs: run-coverage: ${{ inputs.run-coverage }} debug-resources: ${{ inputs.debug-resources }} + tests-database-isolation: + name: "Database isolation test" + uses: ./.github/workflows/run-unit-tests.yml + permissions: + contents: read + packages: read + secrets: inherit + with: + runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }} + enable-aip-44: "true" + database-isolation: "true" + test-name: "DatabaseIsolation-Postgres" + test-scope: "DB" + backend: "postgres" + image-tag: ${{ inputs.image-tag }} + python-versions: "['${{ inputs.default-python-version }}']" + backend-versions: "['${{ inputs.default-postgres-version }}']" + excludes: "[]" + parallel-test-types-list-as-string: ${{ inputs.parallel-test-types-list-as-string }} + include-success-outputs: ${{ needs.build-info.outputs.include-success-outputs }} + run-coverage: ${{ inputs.run-coverage }} + debug-resources: ${{ inputs.debug-resources }} + tests-quarantined: name: "Quarantined test" uses: ./.github/workflows/run-unit-tests.yml diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index ad65157ef9415..a85964af4f64a 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -228,19 +228,19 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: except Exception: return log_and_build_error_response(message="Error deserializing parameters.", status=400) - log.info("Calling method %s\nparams: %s", method_name, params) + log.debug("Calling method %s\nparams: %s", method_name, params) try: # Session must be created there as it may be needed by serializer for lazy-loaded fields. with create_session() as session: output = handler(**params, session=session) output_json = BaseSerialization.serialize(output, use_pydantic_models=True) response = json.dumps(output_json) if output_json is not None else None - log.info("Sending response: %s", response) + log.debug("Sending response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) except AirflowException as e: # In case of AirflowException transport the exception class back to caller exception_json = BaseSerialization.serialize(e, use_pydantic_models=True) response = json.dumps(exception_json) - log.info("Sending exception response: %s", response) + log.debug("Sending exception response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) except Exception: return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 7ffa596ec67a1..f21db0c675fd3 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1516,10 +1516,11 @@ def run( data_interval=info.data_interval, ) ti = TaskInstance(self, run_id=dr.run_id) + session.add(ti) ti.dag_run = dr session.add(dr) session.flush() - + session.commit() ti.run( mark_success=mark_success, ignore_depends_on_past=ignore_depends_on_past, diff --git a/airflow/settings.py b/airflow/settings.py index 751bb3876037e..175a63f69d2f4 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -313,6 +313,8 @@ def remove(*args, **kwargs): AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py") AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py") AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py") +AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py") +AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py") class TracebackSessionForTests: @@ -370,6 +372,9 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] :return: True if the object was created from test code, False otherwise. """ self.traceback = traceback.extract_stack() + if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback): + # This is a fixture call + return True, None airflow_frames = [ tb for tb in self.traceback @@ -378,24 +383,30 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] and not tb.filename == AIRFLOW_UTILS_SESSION_PATH ] if any( - filename.endswith("conftest.py") or filename.endswith("tests/test_utils/db.py") - for filename, _, _, _ in airflow_frames + filename.endswith("conftest.py") + or filename.endswith("tests/test_utils/db.py") + or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method")) + for filename, _, name, _ in airflow_frames ): # This is a fixture call or testing utilities return True, None - if ( - len(airflow_frames) >= 2 - and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH) - and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH - and airflow_frames[-1].name == "run" - ): - # This is baseoperator run method that is called directly from the test code and this is - # usual pattern where we create a session in the test code to create dag_runs for tests. - # If `run` code will be run inside a real "airflow" code the stack trace would be longer - # and it would not be directly called from the test code. Also if subsequently any of the - # run_task() method called later from the task code will attempt to execute any DB - # method, the stack trace will be longer and we will catch it as "illegal" call. - return True, None + if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH): + # Let's look at what we are calling directly from the test code + current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name + if (current_filename, current_method_name) in ( + (AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"), + (AIRFLOW_MODELS_DAG_PATH, "create_dagrun"), + ): + # This is baseoperator run method that is called directly from the test code and this is + # usual pattern where we create a session in the test code to create dag_runs for tests. + # If `run` code will be run inside a real "airflow" code the stack trace would be longer + # and it would not be directly called from the test code. Also if subsequently any of the + # run_task() method called later from the task code will attempt to execute any DB + # method, the stack trace will be longer and we will catch it as "illegal" call. + return True, None + if current_filename == AIRFLOW_DB_UTILS_PATH: + # This is a util method called directly from the test code + return True, None for tb in airflow_frames[::-1]: if tb.filename.startswith(AIRFLOW_PATH): if tb.filename.startswith(AIRFLOW_TESTS_PATH): @@ -407,6 +418,16 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None] # The traceback line will be always 3rd (two bottom ones are Airflow) return False, self.traceback[-2] + def get_bind( + self, + mapper=None, + clause=None, + bind=None, + _sa_skip_events=None, + _sa_skip_for_implicit_returning=False, + ): + pass + def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool: """Determine whether the database connection URI specifies a relative path.""" diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index 51ca4bea5d636..cef51d975219e 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -206,6 +206,7 @@ def _run_test( helm_test_package=None, keep_env_variables=shell_params.keep_env_variables, no_db_cleanup=shell_params.no_db_cleanup, + database_isolation=shell_params.database_isolation, ) ) run_cmd.extend(list(extra_pytest_args)) @@ -968,6 +969,7 @@ def helm_tests( helm_test_package=helm_test_package, keep_env_variables=False, no_db_cleanup=False, + database_isolation=False, ) cmd = ["docker", "compose", "run", "--service-ports", "--rm", "airflow", *pytest_args, *extra_pytest_args] result = run_command(cmd, check=False, env=env, output_outside_the_group=True) diff --git a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2 b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2 index b8a966448c07b..e51939c57571c 100644 --- a/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2 +++ b/dev/breeze/src/airflow_breeze/templates/CHANGELOG_TEMPLATE.rst.jinja2 @@ -40,7 +40,7 @@ Features {%- endif %} -{%- if classified_changes.fixes %} +{%- if classified_changes and classified_changes.fixes %} Bug Fixes ~~~~~~~~~ @@ -50,7 +50,7 @@ Bug Fixes {%- endif %} -{%- if classified_changes.misc %} +{%- if classified_changes and classified_changes.misc %} Misc ~~~~ @@ -62,7 +62,7 @@ Misc .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): -{%- if classified_changes.other %} +{%- if classified_changes and classified_changes.other %} {%- for other in classified_changes.other %} * ``{{ other.message_without_backticks | safe }}`` {%- endfor %} diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index 375dc568475b4..73cbb430817cc 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -311,6 +311,7 @@ def generate_args_for_pytest( helm_test_package: str | None, keep_env_variables: bool, no_db_cleanup: bool, + database_isolation: bool, ): result_log_file, warnings_file, coverage_file = test_paths(test_type, backend, helm_test_package) if skip_db_tests: @@ -327,12 +328,13 @@ def generate_args_for_pytest( helm_test_package=helm_test_package, python_version=python_version, ) + max_fail = 50 args.extend( [ "--verbosity=0", "--strict-markers", "--durations=100", - "--maxfail=50", + f"--maxfail={max_fail}", "--color=yes", f"--junitxml={result_log_file}", # timeouts in seconds for individual tests diff --git a/tests/cli/commands/test_internal_api_command.py b/tests/cli/commands/test_internal_api_command.py index 99992e6266861..a1aaf2daca604 100644 --- a/tests/cli/commands/test_internal_api_command.py +++ b/tests/cli/commands/test_internal_api_command.py @@ -83,6 +83,9 @@ def test_ready_prefix_on_cmdline_dead_process(self): assert self.monitor._get_num_ready_workers_running() == 0 +# Those tests are skipped in isolation mode because they interfere with the internal API +# server already running in the background in the isolation mode. +@pytest.mark.skip_if_database_isolation_mode @pytest.mark.db_test @pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled") class TestCliInternalAPI(_ComonCLIGunicornTestClass): diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py index 07d95a9e5f75a..fa2e58af9efe4 100644 --- a/tests/cli/commands/test_webserver_command.py +++ b/tests/cli/commands/test_webserver_command.py @@ -226,6 +226,9 @@ def test_ready_prefix_on_cmdline_dead_process(self): assert self.monitor._get_num_ready_workers_running() == 0 +# Those tests are skipped in isolation mode because they interfere with the internal API +# server already running in the background in the isolation mode. +@pytest.mark.skip_if_database_isolation_mode @pytest.mark.db_test class TestCliWebServer(_ComonCLIGunicornTestClass): main_process_regexp = r"airflow webserver" diff --git a/tests/decorators/test_bash.py b/tests/decorators/test_bash.py index ba8948936eda1..9fa7999e83476 100644 --- a/tests/decorators/test_bash.py +++ b/tests/decorators/test_bash.py @@ -33,6 +33,9 @@ DEFAULT_DATE = timezone.datetime(2023, 1, 1) +# TODO(potiuk) see why this test hangs in DB isolation mode +pytestmark = pytest.mark.skip_if_database_isolation_mode + @pytest.mark.db_test class TestBashDecorator: diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py index d991f22cd55e4..d2466365bef8d 100644 --- a/tests/decorators/test_branch_external_python.py +++ b/tests/decorators/test_branch_external_python.py @@ -24,7 +24,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class Test_BranchExternalPythonDecoratedOperator: diff --git a/tests/decorators/test_branch_python.py b/tests/decorators/test_branch_python.py index 58bb216246049..3cd95b8d2a4ad 100644 --- a/tests/decorators/test_branch_python.py +++ b/tests/decorators/test_branch_python.py @@ -22,7 +22,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class Test_BranchPythonDecoratedOperator: diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py index a5c23de392de3..6cdfa1ddff25e 100644 --- a/tests/decorators/test_branch_virtualenv.py +++ b/tests/decorators/test_branch_virtualenv.py @@ -22,7 +22,8 @@ from airflow.decorators import task from airflow.utils.state import State -pytestmark = pytest.mark.db_test +# TODO: (potiuk) - AIP-44 - check why this test hangs +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class TestBranchPythonVirtualenvDecoratedOperator: diff --git a/tests/decorators/test_condition.py b/tests/decorators/test_condition.py index 315db6bfe0d12..28e0f0bf8fee0 100644 --- a/tests/decorators/test_condition.py +++ b/tests/decorators/test_condition.py @@ -28,7 +28,8 @@ from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context -pytestmark = pytest.mark.db_test +# TODO(potiuk) see why this test hangs in DB isolation mode +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode diff --git a/tests/decorators/test_external_python.py b/tests/decorators/test_external_python.py index 5ed5874e3a55a..0d9a439aa2371 100644 --- a/tests/decorators/test_external_python.py +++ b/tests/decorators/test_external_python.py @@ -29,7 +29,7 @@ from airflow.decorators import setup, task, teardown from airflow.utils import timezone -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] DEFAULT_DATE = timezone.datetime(2016, 1, 1) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 9d2b9a14c82b4..067beff3abbff 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -41,7 +41,7 @@ from airflow.utils.xcom import XCOM_RETURN_KEY from tests.operators.test_python import BasePythonTest -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] if TYPE_CHECKING: @@ -281,6 +281,8 @@ class Test: def add_number(self, num: int) -> int: return self.num + num + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_fail_multiple_outputs_key_type(self): @task_decorator(multiple_outputs=True) def add_number(num: int): @@ -293,6 +295,8 @@ def add_number(num: int): with pytest.raises(AirflowException): ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_fail_multiple_outputs_no_dict(self): @task_decorator(multiple_outputs=True) def add_number(num: int): @@ -541,6 +545,8 @@ def add_2(number: int): assert "add_2" in self.dag_non_serialized.task_ids + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_dag_task_multiple_outputs(self): """Tests dag.task property to generate task with multiple outputs""" @@ -863,6 +869,7 @@ def org_test_func(): assert decorated_test_func.__wrapped__ is org_test_func, "__wrapped__ attr is not the original function" +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_upstream_exception_produces_none_xcom(dag_maker, session): from airflow.exceptions import AirflowSkipException @@ -900,6 +907,7 @@ def down(a, b): assert result == "'example' None" +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @pytest.mark.parametrize("multiple_outputs", [True, False]) def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker, session, multiple_outputs): @@ -958,6 +966,7 @@ def other(x): ... assert caplog.messages == [] +@pytest.mark.need_serialized_dag(False) @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_task_decorator_dataset(dag_maker, session): from airflow.datasets import Dataset diff --git a/tests/decorators/test_python_virtualenv.py b/tests/decorators/test_python_virtualenv.py index 554b33ceb9b77..57a096ef192c7 100644 --- a/tests/decorators/test_python_virtualenv.py +++ b/tests/decorators/test_python_virtualenv.py @@ -30,7 +30,7 @@ from airflow.utils import timezone from airflow.utils.state import TaskInstanceState -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] DEFAULT_DATE = timezone.datetime(2016, 1, 1) PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" @@ -373,6 +373,8 @@ def f(): assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + # TODO(potiuk) see why this test hangs in DB isolation mode + @pytest.mark.skip_if_database_isolation_mode def test_invalid_annotation(self, dag_maker): import uuid diff --git a/tests/decorators/test_sensor.py b/tests/decorators/test_sensor.py index e970894a38ed8..a283ed871ba1d 100644 --- a/tests/decorators/test_sensor.py +++ b/tests/decorators/test_sensor.py @@ -26,7 +26,7 @@ from airflow.sensors.base import PokeReturnValue from airflow.utils.state import State -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag] class TestSensorDecorator: @@ -52,6 +52,7 @@ def dummy_f(): sf >> df dr = dag_maker.create_dagrun() + sf.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) tis = dr.get_task_instances() assert len(tis) == 2 diff --git a/tests/decorators/test_short_circuit.py b/tests/decorators/test_short_circuit.py index 1d43de68421f9..1c8349b6c9c86 100644 --- a/tests/decorators/test_short_circuit.py +++ b/tests/decorators/test_short_circuit.py @@ -24,7 +24,7 @@ from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] DEFAULT_DATE = datetime(2022, 8, 17) diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index 3ec2691e5af95..e3d5c023a24ab 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -30,7 +30,7 @@ from tests.test_utils import db from tests.test_utils.config import conf_vars -pytestmark = pytest.mark.db_test +pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] class TestVariable: diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 993d70cad3340..f24281275126d 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -1306,6 +1306,9 @@ def f(a): "AssertRewritingHook including captured stdout and we need to run " "it with `--assert=plain` pytest option and PYTEST_PLAIN_ASSERTS=true .", ) + # TODO(potiuk) check if this can be fixed in the future - for now we are skipping tests with venv + # and airflow context in DB isolation mode as they are passing None as DAG. + @pytest.mark.skip_if_database_isolation_mode def test_airflow_context(self, serializer): def f( # basic diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index e9c8ceaf03979..d1c6787db39b6 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -399,6 +399,8 @@ def timetable_plugin(monkeypatch): ) +# TODO: (potiuk) - AIP-44 - check why this test hangs +@pytest.mark.skip_if_database_isolation_mode class TestStringifiedDAGs: """Unit tests for stringified DAGs.""" From e76dba61b92eadcee9e3193640ab8db6a9466528 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 19 Aug 2024 18:30:42 +0530 Subject: [PATCH 10/17] Upgrade build and chart dependencies (#41570) (#41588) (cherry picked from commit c88192c466cb91842310f82a61eaa48b39439bef) Co-authored-by: Jarek Potiuk --- Dockerfile | 2 +- Dockerfile.ci | 4 ++-- chart/values.schema.json | 2 +- chart/values.yaml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8c4a43274fd33..5eb2b3355695f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.9.3" ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm" ARG AIRFLOW_PIP_VERSION=24.2 -ARG AIRFLOW_UV_VERSION=0.2.34 +ARG AIRFLOW_UV_VERSION=0.2.37 ARG AIRFLOW_USE_UV="false" ARG UV_HTTP_TIMEOUT="300" ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow" diff --git a/Dockerfile.ci b/Dockerfile.ci index 06a7343c82e26..58a403a20a9ce 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1311,7 +1311,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main" ARG AIRFLOW_CI_BUILD_EPOCH="10" ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true" ARG AIRFLOW_PIP_VERSION=24.2 -ARG AIRFLOW_UV_VERSION=0.2.34 +ARG AIRFLOW_UV_VERSION=0.2.37 ARG AIRFLOW_USE_UV="true" # Setup PIP # By default PIP install run without cache to make image smaller @@ -1335,7 +1335,7 @@ ARG AIRFLOW_VERSION="" ARG ADDITIONAL_PIP_INSTALL_FLAGS="" ARG AIRFLOW_PIP_VERSION=24.2 -ARG AIRFLOW_UV_VERSION=0.2.34 +ARG AIRFLOW_UV_VERSION=0.2.37 ARG AIRFLOW_USE_UV="true" ENV AIRFLOW_REPO=${AIRFLOW_REPO}\ diff --git a/chart/values.schema.json b/chart/values.schema.json index 5f6a3b55d89f5..4ba434c3417bf 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -671,7 +671,7 @@ "tag": { "description": "The StatsD image tag.", "type": "string", - "default": "v0.26.1" + "default": "v0.27.1" }, "pullPolicy": { "description": "The StatsD image pull policy.", diff --git a/chart/values.yaml b/chart/values.yaml index 76130e55bb5a0..a28c3c6d54033 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -105,7 +105,7 @@ images: pullPolicy: IfNotPresent statsd: repository: quay.io/prometheus/statsd-exporter - tag: v0.26.1 + tag: v0.27.1 pullPolicy: IfNotPresent redis: repository: redis From 01e6677d3e9ff2aa28961d792663b189628dd8b3 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 20 Aug 2024 15:05:00 +0200 Subject: [PATCH 11/17] Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612) (cherry picked from commit 1b602d50266184d118db52a674baeab29b1f5688) --- airflow/providers/amazon/provider.yaml | 2 +- generated/provider_dependencies.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index c69542ef334dc..1c6a86f1805fa 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -101,7 +101,7 @@ dependencies: - botocore>=1.34.90 - inflection>=0.5.1 # Allow a wider range of watchtower versions for flexibility among users - - watchtower>=3.0.0,<4 + - watchtower>=3.0.0,!=3.3.0,<4 - jsonpath_ng>=1.5.3 - redshift_connector>=2.0.918 - sqlalchemy_redshift>=0.8.6 diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 58fb34b2adf02..1ac11366c643d 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -41,7 +41,7 @@ "jsonpath_ng>=1.5.3", "redshift_connector>=2.0.918", "sqlalchemy_redshift>=0.8.6", - "watchtower>=3.0.0,<4" + "watchtower>=3.0.0,!=3.3.0,<4" ], "devel-deps": [ "aiobotocore>=2.13.0", From 44190faa0c22bb530fce0419e8fee5cb7d63cf50 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 20 Aug 2024 23:51:37 +0200 Subject: [PATCH 12/17] Enable running Pull Requests against v2-10-stable branch (#41624) (cherry picked from commit e306e7f7bc1ef12aeab0fc09e018accda3684a2f) --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e1db2a14edfc..866f8f253d401 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ on: # yamllint disable-line rule:truthy push: branches: ['v[0-9]+-[0-9]+-test'] pull_request: - branches: ['main', 'v[0-9]+-[0-9]+-test'] + branches: ['main', 'v[0-9]+-[0-9]+-test', 'v[0-9]+-[0-9]+-stable'] workflow_dispatch: permissions: # All other permissions are set to none From 055b81d4f85521430e795fe4d0c09abc94e9c721 Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:44:08 +0200 Subject: [PATCH 13/17] Fix tests/models/test_variable.py for database isolation mode (#41414) * Fix tests/models/test_variable.py for database isolation mode * Review feedback (cherry picked from commit 736ebfe3fe2bd67406d5a50dacbfa1e43767d4ce) --- .../endpoints/rpc_api_endpoint.py | 9 +-- airflow/api_internal/internal_api_call.py | 2 +- airflow/models/variable.py | 66 ++++++++++++++++++- airflow/serialization/enums.py | 1 + airflow/serialization/serialized_objects.py | 16 ++++- tests/models/test_variable.py | 8 ++- 6 files changed, 90 insertions(+), 12 deletions(-) diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py index a85964af4f64a..c3d8b671fbb08 100644 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py @@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]: # XCom.get_many, # Not supported because it returns query XCom.clear, XCom.set, - Variable.set, - Variable.update, - Variable.delete, + Variable._set, + Variable._update, + Variable._delete, DAG.fetch_callback, DAG.fetch_dagrun, DagRun.fetch_task_instances, @@ -237,7 +237,8 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse: response = json.dumps(output_json) if output_json is not None else None log.debug("Sending response: %s", response) return Response(response=response, headers={"Content-Type": "application/json"}) - except AirflowException as e: # In case of AirflowException transport the exception class back to caller + # In case of AirflowException or other selective known types, transport the exception class back to caller + except (KeyError, AttributeError, AirflowException) as e: exception_json = BaseSerialization.serialize(e, use_pydantic_models=True) response = json.dumps(exception_json) log.debug("Sending exception response: %s", response) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index fc0945b3c0fe0..8838377877bec 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs): if result is None or result == b"": return None result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True) - if isinstance(result, AirflowException): + if isinstance(result, (KeyError, AttributeError, AirflowException)): raise result return result diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 63b71303bc803..563cac46e8c84 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -154,7 +154,6 @@ def get( @staticmethod @provide_session - @internal_api_call def set( key: str, value: Any, @@ -167,6 +166,35 @@ def set( This operation overwrites an existing variable. + :param key: Variable Key + :param value: Value to set for the Variable + :param description: Description of the Variable + :param serialize_json: Serialize the value to a JSON string + :param session: Session + """ + Variable._set( + key=key, value=value, description=description, serialize_json=serialize_json, session=session + ) + # invalidate key in cache for faster propagation + # we cannot save the value set because it's possible that it's shadowed by a custom backend + # (see call to check_for_write_conflict above) + SecretCache.invalidate_variable(key) + + @staticmethod + @provide_session + @internal_api_call + def _set( + key: str, + value: Any, + description: str | None = None, + serialize_json: bool = False, + session: Session = None, + ) -> None: + """ + Set a value for an Airflow Variable with a given Key. + + This operation overwrites an existing variable. + :param key: Variable Key :param value: Value to set for the Variable :param description: Description of the Variable @@ -190,7 +218,6 @@ def set( @staticmethod @provide_session - @internal_api_call def update( key: str, value: Any, @@ -200,6 +227,27 @@ def update( """ Update a given Airflow Variable with the Provided value. + :param key: Variable Key + :param value: Value to set for the Variable + :param serialize_json: Serialize the value to a JSON string + :param session: Session + """ + Variable._update(key=key, value=value, serialize_json=serialize_json, session=session) + # We need to invalidate the cache for internal API cases on the client side + SecretCache.invalidate_variable(key) + + @staticmethod + @provide_session + @internal_api_call + def _update( + key: str, + value: Any, + serialize_json: bool = False, + session: Session = None, + ) -> None: + """ + Update a given Airflow Variable with the Provided value. + :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string @@ -219,11 +267,23 @@ def update( @staticmethod @provide_session - @internal_api_call def delete(key: str, session: Session = None) -> int: """ Delete an Airflow Variable for a given key. + :param key: Variable Keys + """ + rows = Variable._delete(key=key, session=session) + SecretCache.invalidate_variable(key) + return rows + + @staticmethod + @provide_session + @internal_api_call + def _delete(key: str, session: Session = None) -> int: + """ + Delete an Airflow Variable for a given key. + :param key: Variable Keys """ rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount diff --git a/airflow/serialization/enums.py b/airflow/serialization/enums.py index a5bd5e3646e83..f216ce7316103 100644 --- a/airflow/serialization/enums.py +++ b/airflow/serialization/enums.py @@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum): RELATIVEDELTA = "relativedelta" BASE_TRIGGER = "base_trigger" AIRFLOW_EXC_SER = "airflow_exc_ser" + BASE_EXC_SER = "base_exc_ser" DICT = "dict" SET = "set" TUPLE = "tuple" diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index d110271c3da08..a3886aa49acef 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -692,6 +692,15 @@ def serialize( ), type_=DAT.AIRFLOW_EXC_SER, ) + elif isinstance(var, (KeyError, AttributeError)): + return cls._encode( + cls.serialize( + {"exc_cls_name": var.__class__.__name__, "args": [var.args], "kwargs": {}}, + use_pydantic_models=use_pydantic_models, + strict=strict, + ), + type_=DAT.BASE_EXC_SER, + ) elif isinstance(var, BaseTrigger): return cls._encode( cls.serialize(var.serialize(), use_pydantic_models=use_pydantic_models, strict=strict), @@ -834,13 +843,16 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: return decode_timezone(var) elif type_ == DAT.RELATIVEDELTA: return decode_relativedelta(var) - elif type_ == DAT.AIRFLOW_EXC_SER: + elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER: deser = cls.deserialize(var, use_pydantic_models=use_pydantic_models) exc_cls_name = deser["exc_cls_name"] args = deser["args"] kwargs = deser["kwargs"] del deser - exc_cls = import_string(exc_cls_name) + if type_ == DAT.AIRFLOW_EXC_SER: + exc_cls = import_string(exc_cls_name) + else: + exc_cls = import_string(f"builtins.{exc_cls_name}") return exc_cls(*args, **kwargs) elif type_ == DAT.BASE_TRIGGER: tr_cls_name, kwargs = cls.deserialize(var, use_pydantic_models=use_pydantic_models) diff --git a/tests/models/test_variable.py b/tests/models/test_variable.py index e3d5c023a24ab..6fb6fa15f214c 100644 --- a/tests/models/test_variable.py +++ b/tests/models/test_variable.py @@ -47,6 +47,7 @@ def setup_test_cases(self): db.clear_db_variables() crypto._fernet = None + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): "True"}) def test_variable_no_encryption(self, session): """ @@ -60,6 +61,7 @@ def test_variable_no_encryption(self, session): # should mask anything. That logic is tested in test_secrets_masker.py self.mask_secret.assert_called_once_with("value", "key") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()}) def test_variable_with_encryption(self, session): """ @@ -70,6 +72,7 @@ def test_variable_with_encryption(self, session): assert test_var.is_encrypted assert test_var.val == "value" + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet @pytest.mark.parametrize("test_value", ["value", ""]) def test_var_with_encryption_rotate_fernet_key(self, test_value, session): """ @@ -152,6 +155,7 @@ def test_variable_update(self, session): Variable.update(key="test_key", value="value2", session=session) assert "value2" == Variable.get("test_key") + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, API server has other ENV def test_variable_update_fails_on_non_metastore_variable(self, session): with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"): with pytest.raises(AttributeError): @@ -281,6 +285,7 @@ def test_caching_caches(self, mock_ensure_secrets: mock.Mock): mock_backend.get_variable.assert_called_once() # second call was not made because of cache assert first == second + @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other env def test_cache_invalidation_on_set(self, session): with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"): a = Variable.get("key") # value is saved in cache @@ -316,7 +321,7 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m val=variable_value, ) session.add(var) - session.flush() + session.commit() # Make sure we re-load it, not just get the cached object back session.expunge(var) _secrets_masker().patterns = set() @@ -326,5 +331,4 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m for expected_masked_value in expected_masked_values: assert expected_masked_value in _secrets_masker().patterns finally: - session.rollback() db.clear_db_variables() From 3dfe19b9555138e6ed9f6e7974db68adcf847cb3 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 21 Aug 2024 11:17:55 +0200 Subject: [PATCH 14/17] Make latest botocore tests green (#41626) The latest botocore tests are conflicting with a few requirements and until apache-beam upcoming version is released we need to do some manual exclusions. Those exclusions should make latest botocore test green again. (cherry picked from commit a13ccbbdec8e59f30218f604fca8cbb999fcb757) --- Dockerfile.ci | 9 +++++---- pyproject.toml | 3 +++ scripts/docker/entrypoint_ci.sh | 9 +++++---- tests/always/test_example_dags.py | 20 ++++++------------- tests/providers/opensearch/conftest.py | 11 ++++++++-- .../opensearch/hooks/test_opensearch.py | 3 ++- .../opensearch/operators/test_opensearch.py | 3 +++ 7 files changed, 33 insertions(+), 25 deletions(-) diff --git a/Dockerfile.ci b/Dockerfile.ci index 58a403a20a9ce..f46890cd8145c 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1022,16 +1022,17 @@ function check_boto_upgrade() { echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}" echo # shellcheck disable=SC2086 - ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud || true + ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud opensearch-py || true # We need to include few dependencies to pass pip check with other dependencies: # * oss2 as dependency as otherwise jmespath will be bumped (sync with alibaba provider) - # * gcloud-aio-auth limit is needed to be included as it bumps cryptography (sync with google provider) + # * cryptography is kept for snowflake-connector-python limitation (sync with snowflake provider) # * requests needs to be limited to be compatible with apache beam (sync with apache-beam provider) # * yandexcloud requirements for requests does not match those of apache.beam and latest botocore # Both requests and yandexcloud exclusion above might be removed after # https://github.com/apache/beam/issues/32080 is addressed - # When you remove yandexcloud from the above list, also remove it from "test_example_dags.py" - # in "tests/always". + # This is already addressed and planned for 2.59.0 release. + # When you remove yandexcloud and opensearch from the above list, you can also remove the + # optional providers_dependencies exclusions from "test_example_dags.py" in "tests/always". set -x # shellcheck disable=SC2086 ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \ diff --git a/pyproject.toml b/pyproject.toml index 621a9e48b8e7d..194c4b268c88c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -405,6 +405,9 @@ combine-as-imports = true "tests/providers/google/cloud/operators/vertex_ai/test_generative_model.py" = ["E402"] "tests/providers/google/cloud/triggers/test_vertex_ai.py" = ["E402"] "tests/providers/openai/hooks/test_openai.py" = ["E402"] +"tests/providers/opensearch/conftest.py" = ["E402"] +"tests/providers/opensearch/hooks/test_opensearch.py" = ["E402"] +"tests/providers/opensearch/operators/test_opensearch.py" = ["E402"] "tests/providers/openai/operators/test_openai.py" = ["E402"] "tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"] "tests/providers/qdrant/operators/test_qdrant.py" = ["E402"] diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index da7ff309763aa..6e9b36507ba68 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -242,16 +242,17 @@ function check_boto_upgrade() { echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}" echo # shellcheck disable=SC2086 - ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud || true + ${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud opensearch-py || true # We need to include few dependencies to pass pip check with other dependencies: # * oss2 as dependency as otherwise jmespath will be bumped (sync with alibaba provider) - # * gcloud-aio-auth limit is needed to be included as it bumps cryptography (sync with google provider) + # * cryptography is kept for snowflake-connector-python limitation (sync with snowflake provider) # * requests needs to be limited to be compatible with apache beam (sync with apache-beam provider) # * yandexcloud requirements for requests does not match those of apache.beam and latest botocore # Both requests and yandexcloud exclusion above might be removed after # https://github.com/apache/beam/issues/32080 is addressed - # When you remove yandexcloud from the above list, also remove it from "test_example_dags.py" - # in "tests/always". + # This is already addressed and planned for 2.59.0 release. + # When you remove yandexcloud and opensearch from the above list, you can also remove the + # optional providers_dependencies exclusions from "test_example_dags.py" in "tests/always". set -x # shellcheck disable=SC2086 ${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \ diff --git a/tests/always/test_example_dags.py b/tests/always/test_example_dags.py index 2b5f37631a427..b8e3edb99b82d 100644 --- a/tests/always/test_example_dags.py +++ b/tests/always/test_example_dags.py @@ -17,6 +17,7 @@ from __future__ import annotations import os +import re import sys from glob import glob from importlib import metadata as importlib_metadata @@ -39,8 +40,11 @@ # Some examples or system tests may depend on additional packages # that are not included in certain CI checks. # The format of the dictionary is as follows: - # key: the prefix of the file to be excluded, + # key: the regexp matching the file to be excluded, # value: a dictionary containing package distributions with an optional version specifier, e.g., >=2.3.4 + ".*example_bedrock_retrieve_and_generate.py": {"opensearch-py": None}, + ".*example_opensearch.py": {"opensearch-py": None}, + r".*example_yandexcloud.*\.py": {"yandexcloud": None}, } IGNORE_AIRFLOW_PROVIDER_DEPRECATION_WARNING: tuple[str, ...] = ( # Certain examples or system tests may trigger AirflowProviderDeprecationWarnings. @@ -124,13 +128,6 @@ def example_not_excluded_dags(xfail_db_exception: bool = False): for prefix in PROVIDERS_PREFIXES for provider in suspended_providers_folders ] - temporary_excluded_upgrade_boto_providers_folders = [ - AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix() - for prefix in PROVIDERS_PREFIXES - # TODO - remove me when https://github.com/apache/beam/issues/32080 is addressed - # and we bring back yandex to be run in case of upgrade boto - for provider in ["yandex"] - ] current_python_excluded_providers_folders = [ AIRFLOW_SOURCES_ROOT.joinpath(prefix, provider).as_posix() for prefix in PROVIDERS_PREFIXES @@ -146,18 +143,13 @@ def example_not_excluded_dags(xfail_db_exception: bool = False): if candidate.startswith(tuple(suspended_providers_folders)): param_marks.append(pytest.mark.skip(reason="Suspended provider")) - if os.environ.get("UPGRADE_BOTO", "false") == "true" and candidate.startswith( - tuple(temporary_excluded_upgrade_boto_providers_folders) - ): - param_marks.append(pytest.mark.skip(reason="Temporary excluded upgrade boto provider")) - if candidate.startswith(tuple(current_python_excluded_providers_folders)): param_marks.append( pytest.mark.skip(reason=f"Not supported for Python {CURRENT_PYTHON_VERSION}") ) for optional, dependencies in OPTIONAL_PROVIDERS_DEPENDENCIES.items(): - if candidate.endswith(optional): + if re.match(optional, candidate): for distribution_name, specifier in dependencies.items(): result, reason = match_optional_dependencies(distribution_name, specifier) if not result: diff --git a/tests/providers/opensearch/conftest.py b/tests/providers/opensearch/conftest.py index 47a447188ecdd..934bbd642ad58 100644 --- a/tests/providers/opensearch/conftest.py +++ b/tests/providers/opensearch/conftest.py @@ -19,12 +19,19 @@ from typing import Any import pytest -from opensearchpy import OpenSearch +from airflow.hooks.base import BaseHook from airflow.models import Connection -from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook from airflow.utils import db +try: + from opensearchpy import OpenSearch + + from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook +except ImportError: + OpenSearch = None # type: ignore[assignment, misc] + OpenSearchHook = BaseHook # type: ignore[assignment,misc] + # TODO: FIXME - those Mocks have overrides that are not used but they also do not make Mypy Happy # mypy: disable-error-code="override" diff --git a/tests/providers/opensearch/hooks/test_opensearch.py b/tests/providers/opensearch/hooks/test_opensearch.py index 84360ae73f46a..43075e8532210 100644 --- a/tests/providers/opensearch/hooks/test_opensearch.py +++ b/tests/providers/opensearch/hooks/test_opensearch.py @@ -18,8 +18,9 @@ from unittest import mock -import opensearchpy import pytest + +opensearchpy = pytest.importorskip("opensearchpy") from opensearchpy import Urllib3HttpConnection from airflow.exceptions import AirflowException diff --git a/tests/providers/opensearch/operators/test_opensearch.py b/tests/providers/opensearch/operators/test_opensearch.py index 706112fef65b3..63ad7eafe48de 100644 --- a/tests/providers/opensearch/operators/test_opensearch.py +++ b/tests/providers/opensearch/operators/test_opensearch.py @@ -17,6 +17,9 @@ from __future__ import annotations import pytest + +opensearchpy = pytest.importorskip("opensearchpy") + from opensearchpy import Document, Keyword, Text from airflow.models import DAG From 3b09b27e624641bba12b119aa0d1196946afd7df Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 12 Aug 2024 08:42:49 +0200 Subject: [PATCH 15/17] Simpler task retrieval for taskinstance test (#41389) The test has been updated for DB isolation but the retrieval of task was not intuitive and it could lead to flaky tests possibly (cherry picked from commit f25adf14ad486bac818fe3fdcd61eb3355e8ec9b) --- tests/models/test_taskinstance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index c2993e9ce8d95..2d3da6a413e2a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1539,8 +1539,9 @@ def do_something_else(i): monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states) ti = dr.get_task_instance("do_something_else", session=session) ti.map_index = 0 + base_task = ti.task for map_index in range(1, 5): - ti = TaskInstance(dr.task_instances[-1].task, run_id=dr.run_id, map_index=map_index) + ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index) session.add(ti) ti.dag_run = dr session.flush() From 15bb0aaa83902167fb9c6fc071877eca2e2daf25 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Wed, 14 Aug 2024 16:24:36 +0200 Subject: [PATCH 16/17] Skip database isolation case for task mapping taskinstance tests (#41471) Related: #41067 (cherry picked from commit 7718bd7a6ed7fb476e4920315b6d11f1ac465f44) --- tests/models/test_taskinstance.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2d3da6a413e2a..4516686a671f7 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1443,7 +1443,10 @@ def test_check_task_dependencies( # Parameterized tests to check for the correct firing # of the trigger_rule under various circumstances of mapped task # Numeric fields are in order: - # successes, skipped, failed, upstream_failed, done,removed + # successes, skipped, failed, upstream_failed, done,remove + # Does not work for database isolation mode because there is local test monkeypatching of upstream_failed + # That never gets propagated to internal_api + @pytest.mark.skip_if_database_isolation_mode @pytest.mark.parametrize( "trigger_rule, upstream_states, flag_upstream_failed, expect_state, expect_completed", [ @@ -1540,6 +1543,7 @@ def do_something_else(i): ti = dr.get_task_instance("do_something_else", session=session) ti.map_index = 0 base_task = ti.task + for map_index in range(1, 5): ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index) session.add(ti) From 4e2174243a04cc104f264a5c1e9f22d44b22eeba Mon Sep 17 00:00:00 2001 From: Bugra Ozturk Date: Wed, 14 Aug 2024 01:47:11 +0200 Subject: [PATCH 17/17] Skipping tests for db isolation because similar tests were skipped (#41450) (cherry picked from commit e94b508b946471420488cc466d92301b54b4c5ae) --- tests/sensors/test_external_task_sensor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index fbebd3d120156..e7a5991963e47 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -809,6 +809,7 @@ def test_catch_invalid_allowed_states(self): dag=self.dag, ) + @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_external_task_sensor_waits_for_task_check_existence(self): op = ExternalTaskSensor( task_id="test_external_task_sensor_check", @@ -821,6 +822,7 @@ def test_external_task_sensor_waits_for_task_check_existence(self): with pytest.raises(AirflowException): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode def test_external_task_sensor_waits_for_dag_check_existence(self): op = ExternalTaskSensor( task_id="test_external_task_sensor_check",