Skip to content

Commit

Permalink
Sync v2-10-stable with v2-10-test to release python client v2.10.0 (#…
Browse files Browse the repository at this point in the history
…41610)

* Enable pull requests to be run from v*test branches (#41474) (#41476)

Since we switch from direct push of cherry-picking to open PRs
against v*test branch, we should enable PRs to run for the target
branch.

(cherry picked from commit a9363e6)

* Prevent provider lowest-dependency tests to run in non-main branch (#41478) (#41481)

When running tests in v2-10-test branch, lowest depenency tests
are run for providers - because when calculating separate tests,
the "skip_provider_tests" has not been used to filter them out.

This PR fixes it.

(cherry picked from commit 75da507)

* Make PROD image building works in non-main PRs (#41480) (#41484)

The PROD image building fails currently in non-main because it
attempts to build source provider packages rather than use them from
PyPi when PR is run against "v-test" branch.

This PR fixes it:

* PROD images in non-main-targetted build will pull providers from
  PyPI rather than build them
* they use PyPI constraints to install the providers
* they use UV - which should speed up building of the images

(cherry picked from commit 4d5f1c4)

* Add WebEncoder for trigger page rendering to avoid render failure (#41350) (#41485)

Co-authored-by: M. Olcay Tercanlı <[email protected]>

* Incorrect try number subtraction producing invalid span id for OTEL airflow (issue #41501) (#41502) (#41535)

* Fix for issue #39336

* removed unnecessary import

(cherry picked from commit dd3c3a7)

Co-authored-by: Howard Yoo <[email protected]>

* Fix failing pydantic v1 tests (#41534) (#41541)

We need to exclude some versions of Pydantic v1 because it conflicts
with aws provider.

(cherry picked from commit a033c5f)

* Fix Non-DB test calculation for main builds (#41499) (#41543)

Pytest has a weird behaviour that it will not collect tests
from parent folder when subfolder of it is specified after the
parent folder. This caused some non-db tests from providers folder
have been skipped during main build.

The issue in Pytest 8.2 (used to work before) is tracked at
pytest-dev/pytest#12605

(cherry picked from commit d489826)

* Add changelog for airflow python client 2.10.0 (#41583) (#41584)

* Add changelog for airflow python client 2.10.0

* Update client version

(cherry picked from commit 317a28e)

* Make all test pass in Database Isolation mode (#41567)

This adds dedicated "DatabaseIsolation" test to airflow v2-10-test
branch..

The DatabaseIsolation test will run all "db-tests" with enabled
DB isolation mode and running `internal-api` component - groups
of tests marked with "skip-if-database-isolation" will be skipped.

* Upgrade build and chart dependencies (#41570) (#41588)

(cherry picked from commit c88192c)

Co-authored-by: Jarek Potiuk <[email protected]>

* Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612)

(cherry picked from commit 1b602d5)

* Enable running Pull Requests against v2-10-stable branch (#41624)

(cherry picked from commit e306e7f)

* Fix tests/models/test_variable.py for database isolation mode (#41414)

* Fix tests/models/test_variable.py for database isolation mode

* Review feedback

(cherry picked from commit 736ebfe)

* Make latest botocore tests green (#41626)

The latest botocore tests are conflicting with a few requirements
and until apache-beam upcoming version is released we need to do
some manual exclusions. Those exclusions should make latest botocore
test green again.

(cherry picked from commit a13ccbb)

* Simpler task retrieval for taskinstance test (#41389)

The test has been updated for DB isolation but the retrieval of
task was not intuitive and it could lead to flaky tests possibly

(cherry picked from commit f25adf1)

* Skip  database isolation case for task mapping taskinstance tests (#41471)

Related: #41067
(cherry picked from commit 7718bd7)

* Skipping tests for db isolation because similar tests were skipped (#41450)

(cherry picked from commit e94b508)

---------

Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
Co-authored-by: M. Olcay Tercanlı <[email protected]>
Co-authored-by: Howard Yoo <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Bugra Ozturk <[email protected]>
  • Loading branch information
7 people committed Aug 30, 2024
1 parent 7fa94ae commit 2d0974e
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 14 deletions.
9 changes: 5 additions & 4 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]:
# XCom.get_many, # Not supported because it returns query
XCom.clear,
XCom.set,
Variable.set,
Variable.update,
Variable.delete,
Variable._set,
Variable._update,
Variable._delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
Expand Down Expand Up @@ -237,7 +237,8 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
response = json.dumps(output_json) if output_json is not None else None
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except AirflowException as e: # In case of AirflowException transport the exception class back to caller
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.debug("Sending exception response: %s", response)
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs):
if result is None or result == b"":
return None
result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True)
if isinstance(result, AirflowException):
if isinstance(result, (KeyError, AttributeError, AirflowException)):
raise result
return result

Expand Down
66 changes: 63 additions & 3 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def get(

@staticmethod
@provide_session
@internal_api_call
def set(
key: str,
value: Any,
Expand All @@ -167,6 +166,35 @@ def set(
This operation overwrites an existing variable.
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._set(
key=key, value=value, description=description, serialize_json=serialize_json, session=session
)
# invalidate key in cache for faster propagation
# we cannot save the value set because it's possible that it's shadowed by a custom backend
# (see call to check_for_write_conflict above)
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _set(
key: str,
value: Any,
description: str | None = None,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Set a value for an Airflow Variable with a given Key.
This operation overwrites an existing variable.
:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
Expand All @@ -190,7 +218,6 @@ def set(

@staticmethod
@provide_session
@internal_api_call
def update(
key: str,
value: Any,
Expand All @@ -200,6 +227,27 @@ def update(
"""
Update a given Airflow Variable with the Provided value.
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._update(key=key, value=value, serialize_json=serialize_json, session=session)
# We need to invalidate the cache for internal API cases on the client side
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _update(
key: str,
value: Any,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Update a given Airflow Variable with the Provided value.
:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
Expand All @@ -219,11 +267,23 @@ def update(

@staticmethod
@provide_session
@internal_api_call
def delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
:param key: Variable Keys
"""
rows = Variable._delete(key=key, session=session)
SecretCache.invalidate_variable(key)
return rows

@staticmethod
@provide_session
@internal_api_call
def _delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
:param key: Variable Keys
"""
rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum):
RELATIVEDELTA = "relativedelta"
BASE_TRIGGER = "base_trigger"
AIRFLOW_EXC_SER = "airflow_exc_ser"
BASE_EXC_SER = "base_exc_ser"
DICT = "dict"
SET = "set"
TUPLE = "tuple"
Expand Down
16 changes: 14 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,15 @@ def serialize(
),
type_=DAT.AIRFLOW_EXC_SER,
)
elif isinstance(var, (KeyError, AttributeError)):
return cls._encode(
cls.serialize(
{"exc_cls_name": var.__class__.__name__, "args": [var.args], "kwargs": {}},
use_pydantic_models=use_pydantic_models,
strict=strict,
),
type_=DAT.BASE_EXC_SER,
)
elif isinstance(var, BaseTrigger):
return cls._encode(
cls.serialize(var.serialize(), use_pydantic_models=use_pydantic_models, strict=strict),
Expand Down Expand Up @@ -834,13 +843,16 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return decode_timezone(var)
elif type_ == DAT.RELATIVEDELTA:
return decode_relativedelta(var)
elif type_ == DAT.AIRFLOW_EXC_SER:
elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
deser = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
exc_cls_name = deser["exc_cls_name"]
args = deser["args"]
kwargs = deser["kwargs"]
del deser
exc_cls = import_string(exc_cls_name)
if type_ == DAT.AIRFLOW_EXC_SER:
exc_cls = import_string(exc_cls_name)
else:
exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
elif type_ == DAT.BASE_TRIGGER:
tr_cls_name, kwargs = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
Expand Down
9 changes: 7 additions & 2 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,10 @@ def test_check_task_dependencies(
# Parameterized tests to check for the correct firing
# of the trigger_rule under various circumstances of mapped task
# Numeric fields are in order:
# successes, skipped, failed, upstream_failed, done,removed
# successes, skipped, failed, upstream_failed, done,remove
# Does not work for database isolation mode because there is local test monkeypatching of upstream_failed
# That never gets propagated to internal_api
@pytest.mark.skip_if_database_isolation_mode
@pytest.mark.parametrize(
"trigger_rule, upstream_states, flag_upstream_failed, expect_state, expect_completed",
[
Expand Down Expand Up @@ -1540,8 +1543,10 @@ def do_something_else(i):
monkeypatch.setattr(_UpstreamTIStates, "calculate", lambda *_: upstream_states)
ti = dr.get_task_instance("do_something_else", session=session)
ti.map_index = 0
base_task = ti.task

for map_index in range(1, 5):
ti = TaskInstance(dr.task_instances[-1].task, run_id=dr.run_id, map_index=map_index)
ti = TaskInstance(base_task, run_id=dr.run_id, map_index=map_index)
session.add(ti)
ti.dag_run = dr
session.flush()
Expand Down
8 changes: 6 additions & 2 deletions tests/models/test_variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def setup_test_cases(self):
db.clear_db_variables()
crypto._fernet = None

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet
@conf_vars({("core", "fernet_key"): "", ("core", "unit_test_mode"): "True"})
def test_variable_no_encryption(self, session):
"""
Expand All @@ -60,6 +61,7 @@ def test_variable_no_encryption(self, session):
# should mask anything. That logic is tested in test_secrets_masker.py
self.mask_secret.assert_called_once_with("value", "key")

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet
@conf_vars({("core", "fernet_key"): Fernet.generate_key().decode()})
def test_variable_with_encryption(self, session):
"""
Expand All @@ -70,6 +72,7 @@ def test_variable_with_encryption(self, session):
assert test_var.is_encrypted
assert test_var.val == "value"

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other fernet
@pytest.mark.parametrize("test_value", ["value", ""])
def test_var_with_encryption_rotate_fernet_key(self, test_value, session):
"""
Expand Down Expand Up @@ -152,6 +155,7 @@ def test_variable_update(self, session):
Variable.update(key="test_key", value="value2", session=session)
assert "value2" == Variable.get("test_key")

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, API server has other ENV
def test_variable_update_fails_on_non_metastore_variable(self, session):
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
with pytest.raises(AttributeError):
Expand Down Expand Up @@ -281,6 +285,7 @@ def test_caching_caches(self, mock_ensure_secrets: mock.Mock):
mock_backend.get_variable.assert_called_once() # second call was not made because of cache
assert first == second

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode, internal API has other env
def test_cache_invalidation_on_set(self, session):
with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="from_env"):
a = Variable.get("key") # value is saved in cache
Expand Down Expand Up @@ -316,7 +321,7 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m
val=variable_value,
)
session.add(var)
session.flush()
session.commit()
# Make sure we re-load it, not just get the cached object back
session.expunge(var)
_secrets_masker().patterns = set()
Expand All @@ -326,5 +331,4 @@ def test_masking_only_secret_values(variable_value, deserialize_json, expected_m
for expected_masked_value in expected_masked_values:
assert expected_masked_value in _secrets_masker().patterns
finally:
session.rollback()
db.clear_db_variables()
2 changes: 2 additions & 0 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ def test_catch_invalid_allowed_states(self):
dag=self.dag,
)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_external_task_sensor_waits_for_task_check_existence(self):
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
Expand All @@ -821,6 +822,7 @@ def test_external_task_sensor_waits_for_task_check_existence(self):
with pytest.raises(AirflowException):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_external_task_sensor_waits_for_dag_check_existence(self):
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
Expand Down

0 comments on commit 2d0974e

Please sign in to comment.