Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests/models/test_taskinstance.py for Database Isolation Tests #41344

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,25 @@ def _get_previous_execution_date(
return pendulum.instance(prev_ti.execution_date) if prev_ti and prev_ti.execution_date else None


def _get_previous_start_date(
*,
task_instance: TaskInstance | TaskInstancePydantic,
state: DagRunState | None,
session: Session,
) -> pendulum.DateTime | None:
"""
Return the start date from property previous_ti_success.

:param task_instance: the task instance
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
log.debug("previous_start_date was called")
prev_ti = task_instance.get_previous_ti(state=state, session=session)
# prev_ti may not exist and prev_ti.start_date may be None.
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None


def _email_alert(
*, task_instance: TaskInstance | TaskInstancePydantic, exception, task: BaseOperator
) -> None:
Expand Down Expand Up @@ -2533,10 +2552,7 @@ def get_previous_start_date(
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
self.log.debug("previous_start_date was called")
prev_ti = self.get_previous_ti(state=state, session=session)
# prev_ti may not exist and prev_ti.start_date may be None.
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None
return _get_previous_start_date(task_instance=self, state=state, session=session)

@property
def previous_start_date_success(self) -> pendulum.DateTime | None:
Expand Down
15 changes: 15 additions & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@ def get_previous_execution_date(

return _get_previous_execution_date(task_instance=self, state=state, session=session)

def get_previous_start_date(
self,
state: DagRunState | None = None,
session: Session | None = None,
) -> pendulum.DateTime | None:
"""
Return the execution date from property previous_ti_success.

:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
from airflow.models.taskinstance import _get_previous_start_date

return _get_previous_start_date(task_instance=self, state=state, session=session)

def email_alert(self, exception, task: BaseOperator) -> None:
"""
Send alert email with exception information.
Expand Down
35 changes: 34 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ def create_task_instance(dag_maker, create_dummy_dag):

Uses ``create_dummy_dag`` to create the dag structure.
"""
from airflow.operators.empty import EmptyOperator

def maker(
execution_date=None,
Expand All @@ -1091,14 +1092,46 @@ def maker(
run_type=None,
data_interval=None,
external_executor_id=None,
dag_id="dag",
task_id="op1",
task_display_name=None,
max_active_tis_per_dag=16,
max_active_tis_per_dagrun=None,
pool="default_pool",
executor_config=None,
trigger_rule="all_done",
on_success_callback=None,
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
email=None,
map_index=-1,
**kwargs,
) -> TaskInstance:
if execution_date is None:
from airflow.utils import timezone

execution_date = timezone.utcnow()
_, task = create_dummy_dag(with_dagrun_type=None, **kwargs)
with dag_maker(dag_id, **kwargs):
op_kwargs = {}
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

if AIRFLOW_V_2_9_PLUS:
op_kwargs["task_display_name"] = task_display_name
task = EmptyOperator(
task_id=task_id,
max_active_tis_per_dag=max_active_tis_per_dag,
max_active_tis_per_dagrun=max_active_tis_per_dagrun,
executor_config=executor_config or {},
on_success_callback=on_success_callback,
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,
)

dagrun_kwargs = {"execution_date": execution_date, "state": dagrun_state}
if run_id is not None:
Expand Down
Loading