Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync v2-10-stable with v2-10-test to release python client v2.10.0 #41610

Merged
merged 18 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b211936
Enable pull requests to be run from v*test branches (#41474) (#41476)
potiuk Aug 14, 2024
5934272
Prevent provider lowest-dependency tests to run in non-main branch (#…
potiuk Aug 14, 2024
bf0d412
Make PROD image building works in non-main PRs (#41480) (#41484)
potiuk Aug 14, 2024
5f8230b
Add WebEncoder for trigger page rendering to avoid render failure (#4…
bbovenzi Aug 15, 2024
30cfb73
Incorrect try number subtraction producing invalid span id for OTEL a…
potiuk Aug 16, 2024
fa4ee68
Fix failing pydantic v1 tests (#41534) (#41541)
potiuk Aug 16, 2024
e36e521
Fix Non-DB test calculation for main builds (#41499) (#41543)
potiuk Aug 16, 2024
3ea764f
Add changelog for airflow python client 2.10.0 (#41583) (#41584)
utkarsharma2 Aug 19, 2024
6441563
Make all test pass in Database Isolation mode (#41567)
potiuk Aug 19, 2024
e76dba6
Upgrade build and chart dependencies (#41570) (#41588)
utkarsharma2 Aug 19, 2024
01e6677
Limit watchtower as depenendcy as 3.3.0 breaks moin. (#41612)
potiuk Aug 20, 2024
ba6ee08
Merge branch 'v2-10-stable' into Sync-v2_10_test
utkarsharma2 Aug 21, 2024
44190fa
Enable running Pull Requests against v2-10-stable branch (#41624)
potiuk Aug 20, 2024
055b81d
Fix tests/models/test_variable.py for database isolation mode (#41414)
jscheffl Aug 13, 2024
3dfe19b
Make latest botocore tests green (#41626)
potiuk Aug 21, 2024
3b09b27
Simpler task retrieval for taskinstance test (#41389)
potiuk Aug 12, 2024
15bb0aa
Skip database isolation case for task mapping taskinstance tests (#4…
potiuk Aug 14, 2024
4e21742
Skipping tests for db isolation because similar tests were skipped (#…
bugraoz93 Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on: # yamllint disable-line rule:truthy
push:
branches: ['v[0-9]+-[0-9]+-test']
pull_request:
branches: ['main']
branches: ['main', 'v[0-9]+-[0-9]+-test', 'v[0-9]+-[0-9]+-stable']
workflow_dispatch:
permissions:
# All other permissions are set to none
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ on: # yamllint disable-line rule:truthy
required: false
default: "true"
type: string
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "false"
type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or not (true/false)"
required: false
Expand Down Expand Up @@ -152,6 +157,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,29 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-database-isolation:
name: "Database isolation test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "true"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
backend-versions: "['${{ inputs.default-postgres-version }}']"
excludes: "[]"
parallel-test-types-list-as-string: ${{ inputs.parallel-test-types-list-as-string }}
include-success-outputs: ${{ needs.build-info.outputs.include-success-outputs }}
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-quarantined:
name: "Quarantined test"
uses: ./.github/workflows/run-unit-tests.yml
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ARG AIRFLOW_VERSION="2.9.3"
ARG PYTHON_BASE_IMAGE="python:3.8-slim-bookworm"

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
16 changes: 9 additions & 7 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1022,16 +1022,17 @@ function check_boto_upgrade() {
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}"
echo
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud || true
${PACKAGING_TOOL_CMD} uninstall ${EXTRA_UNINSTALL_FLAGS} aiobotocore s3fs yandexcloud opensearch-py || true
# We need to include few dependencies to pass pip check with other dependencies:
# * oss2 as dependency as otherwise jmespath will be bumped (sync with alibaba provider)
# * gcloud-aio-auth limit is needed to be included as it bumps cryptography (sync with google provider)
# * cryptography is kept for snowflake-connector-python limitation (sync with snowflake provider)
# * requests needs to be limited to be compatible with apache beam (sync with apache-beam provider)
# * yandexcloud requirements for requests does not match those of apache.beam and latest botocore
# Both requests and yandexcloud exclusion above might be removed after
# https://github.com/apache/beam/issues/32080 is addressed
# When you remove yandexcloud from the above list, also remove it from "test_example_dags.py"
# in "tests/always".
# This is already addressed and planned for 2.59.0 release.
# When you remove yandexcloud and opensearch from the above list, you can also remove the
# optional providers_dependencies exclusions from "test_example_dags.py" in "tests/always".
set -x
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade boto3 botocore \
Expand Down Expand Up @@ -1068,8 +1069,9 @@ function check_pydantic() {
echo
echo "${COLOR_YELLOW}Downgrading Pydantic to < 2${COLOR_RESET}"
echo
# Pydantic 1.10.17/1.10.15 conflicts with aws-sam-translator so we need to exclude it
# shellcheck disable=SC2086
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0"
${PACKAGING_TOOL_CMD} install ${EXTRA_INSTALL_FLAGS} --upgrade "pydantic<2.0.0,!=1.10.17,!=1.10.15"
pip check
else
echo
Expand Down Expand Up @@ -1310,7 +1312,7 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
Expand All @@ -1334,7 +1336,7 @@ ARG AIRFLOW_VERSION=""
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.2.34
ARG AIRFLOW_UV_VERSION=0.2.37
ARG AIRFLOW_USE_UV="true"

ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
Expand Down
15 changes: 8 additions & 7 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ def initialize_method_map() -> dict[str, Callable]:
# XCom.get_many, # Not supported because it returns query
XCom.clear,
XCom.set,
Variable.set,
Variable.update,
Variable.delete,
Variable._set,
Variable._update,
Variable._delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
Expand Down Expand Up @@ -228,19 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.info("Calling method %s\nparams: %s", method_name, params)
log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.info("Sending response: %s", response)
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except AirflowException as e: # In case of AirflowException transport the exception class back to caller
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.info("Sending exception response: %s", response)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
2 changes: 1 addition & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs):
if result is None or result == b"":
return None
result = BaseSerialization.deserialize(json.loads(result), use_pydantic_models=True)
if isinstance(result, AirflowException):
if isinstance(result, (KeyError, AttributeError, AirflowException)):
raise result
return result

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)

self.change_state(key, TaskInstanceState.SUCCESS, info)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))

local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def execute_async(
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
span.set_attribute("try_number", key.try_number - 1)
span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))

def sync(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
span.set_attribute("try_number", ti.try_number - 1)
span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,10 +1516,11 @@ def run(
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()

session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
Expand Down
66 changes: 63 additions & 3 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def get(

@staticmethod
@provide_session
@internal_api_call
def set(
key: str,
value: Any,
Expand All @@ -167,6 +166,35 @@ def set(

This operation overwrites an existing variable.

:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._set(
key=key, value=value, description=description, serialize_json=serialize_json, session=session
)
# invalidate key in cache for faster propagation
# we cannot save the value set because it's possible that it's shadowed by a custom backend
# (see call to check_for_write_conflict above)
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _set(
key: str,
value: Any,
description: str | None = None,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Set a value for an Airflow Variable with a given Key.

This operation overwrites an existing variable.

:param key: Variable Key
:param value: Value to set for the Variable
:param description: Description of the Variable
Expand All @@ -190,7 +218,6 @@ def set(

@staticmethod
@provide_session
@internal_api_call
def update(
key: str,
value: Any,
Expand All @@ -200,6 +227,27 @@ def update(
"""
Update a given Airflow Variable with the Provided value.

:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
:param session: Session
"""
Variable._update(key=key, value=value, serialize_json=serialize_json, session=session)
# We need to invalidate the cache for internal API cases on the client side
SecretCache.invalidate_variable(key)

@staticmethod
@provide_session
@internal_api_call
def _update(
key: str,
value: Any,
serialize_json: bool = False,
session: Session = None,
) -> None:
"""
Update a given Airflow Variable with the Provided value.

:param key: Variable Key
:param value: Value to set for the Variable
:param serialize_json: Serialize the value to a JSON string
Expand All @@ -219,11 +267,23 @@ def update(

@staticmethod
@provide_session
@internal_api_call
def delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.

:param key: Variable Keys
"""
rows = Variable._delete(key=key, session=session)
SecretCache.invalidate_variable(key)
return rows

@staticmethod
@provide_session
@internal_api_call
def _delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.

:param key: Variable Keys
"""
rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies:
- botocore>=1.34.90
- inflection>=0.5.1
# Allow a wider range of watchtower versions for flexibility among users
- watchtower>=3.0.0,<4
- watchtower>=3.0.0,!=3.3.0,<4
- jsonpath_ng>=1.5.3
- redshift_connector>=2.0.918
- sqlalchemy_redshift>=0.8.6
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class DagAttributeTypes(str, Enum):
RELATIVEDELTA = "relativedelta"
BASE_TRIGGER = "base_trigger"
AIRFLOW_EXC_SER = "airflow_exc_ser"
BASE_EXC_SER = "base_exc_ser"
DICT = "dict"
SET = "set"
TUPLE = "tuple"
Expand Down
16 changes: 14 additions & 2 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,15 @@ def serialize(
),
type_=DAT.AIRFLOW_EXC_SER,
)
elif isinstance(var, (KeyError, AttributeError)):
return cls._encode(
cls.serialize(
{"exc_cls_name": var.__class__.__name__, "args": [var.args], "kwargs": {}},
use_pydantic_models=use_pydantic_models,
strict=strict,
),
type_=DAT.BASE_EXC_SER,
)
elif isinstance(var, BaseTrigger):
return cls._encode(
cls.serialize(var.serialize(), use_pydantic_models=use_pydantic_models, strict=strict),
Expand Down Expand Up @@ -834,13 +843,16 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return decode_timezone(var)
elif type_ == DAT.RELATIVEDELTA:
return decode_relativedelta(var)
elif type_ == DAT.AIRFLOW_EXC_SER:
elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
deser = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
exc_cls_name = deser["exc_cls_name"]
args = deser["args"]
kwargs = deser["kwargs"]
del deser
exc_cls = import_string(exc_cls_name)
if type_ == DAT.AIRFLOW_EXC_SER:
exc_cls = import_string(exc_cls_name)
else:
exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
elif type_ == DAT.BASE_TRIGGER:
tr_cls_name, kwargs = cls.deserialize(var, use_pydantic_models=use_pydantic_models)
Expand Down
Loading