diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 62eb79336e7042..cbe6bcdd46451f 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -464,6 +464,15 @@ core: sensitive: true default: ~ example: '{"some_param": "some_value"}' + strict_dataset_uri_validation: + description: | + Dataset URI validation should raise an exception if it is not compliant with AIP-60. + By default this configuration is false, meaning that Airflow 2.x only warns the user. + In Airflow 3, this configuration will be enabled by default. + default: "False" + example: ~ + version_added: 2.9.2 + type: boolean database_access_isolation: description: (experimental) Whether components should use Airflow Internal API for DB connectivity. version_added: 2.6.0 diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 2507c69d01b438..49ff5885cc5ab0 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -27,6 +27,9 @@ if TYPE_CHECKING: from urllib.parse import SplitResult + +from airflow.configuration import conf + __all__ = ["Dataset", "DatasetAll", "DatasetAny"] @@ -78,7 +81,18 @@ def _sanitize_uri(uri: str) -> str: fragment="", # Ignore any fragments. ) if (normalizer := _get_uri_normalizer(normalized_scheme)) is not None: - parsed = normalizer(parsed) + try: + parsed = normalizer(parsed) + except ValueError as exception: + if conf.getboolean("core", "strict_dataset_uri_validation", fallback=False): + raise exception + else: + warnings.warn( + f"The dataset URI {uri} is not AIP-60 compliant. " + f"In Airflow 3, this will raise an exception. More information: {repr(exception)}", + UserWarning, + stacklevel=3, + ) return urllib.parse.urlunsplit(parsed) diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index c1d68c0933cbfd..2b299e2293f8e2 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -20,15 +20,17 @@ import os from collections import defaultdict from typing import Callable +from unittest.mock import patch import pytest from sqlalchemy.sql import select -from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny +from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, DatasetAny, _sanitize_uri from airflow.models.dataset import DatasetDagRunQueue, DatasetModel from airflow.models.serialized_dag import SerializedDagModel from airflow.operators.empty import EmptyOperator from airflow.serialization.serialized_objects import BaseSerialization, SerializedDAG +from tests.test_utils.config import conf_vars @pytest.fixture @@ -441,3 +443,31 @@ def test_datasets_expression_error(expression: Callable[[], None], error: str) - with pytest.raises(TypeError) as info: expression() assert str(info.value) == error + + +def mock_get_uri_normalizer(normalized_scheme): + def normalizer(uri): + raise ValueError("Incorrect URI format") + + return normalizer + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@patch("airflow.datasets.warnings.warn") +def test__sanitize_uri_raises_warning(mock_warn): + _sanitize_uri("postgres://localhost:5432/database.schema.table") + msg = mock_warn.call_args.args[0] + assert "The dataset URI postgres://localhost:5432/database.schema.table is not AIP-60 compliant." in msg + assert ( + "In Airflow 3, this will raise an exception. More information: ValueError('Incorrect URI format')" + in msg + ) + + +@patch("airflow.datasets._get_uri_normalizer", mock_get_uri_normalizer) +@conf_vars({("core", "strict_dataset_uri_validation"): "True"}) +def test__sanitize_uri_raises_exception(): + with pytest.raises(ValueError) as e_info: + _sanitize_uri("postgres://localhost:5432/database.schema.table") + assert isinstance(e_info.value, ValueError) + assert str(e_info.value) == "Incorrect URI format"