Skip to content

Commit

Permalink
Fix tests/models/test_taskinstance.py for Database Isolation Tests (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Aug 9, 2024
1 parent 527bcaf commit c6bdd1f
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 19 deletions.
24 changes: 20 additions & 4 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,25 @@ def _get_previous_execution_date(
return pendulum.instance(prev_ti.execution_date) if prev_ti and prev_ti.execution_date else None


def _get_previous_start_date(
*,
task_instance: TaskInstance | TaskInstancePydantic,
state: DagRunState | None,
session: Session,
) -> pendulum.DateTime | None:
"""
Return the start date from property previous_ti_success.
:param task_instance: the task instance
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
log.debug("previous_start_date was called")
prev_ti = task_instance.get_previous_ti(state=state, session=session)
# prev_ti may not exist and prev_ti.start_date may be None.
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None


def _email_alert(
*, task_instance: TaskInstance | TaskInstancePydantic, exception, task: BaseOperator
) -> None:
Expand Down Expand Up @@ -2533,10 +2552,7 @@ def get_previous_start_date(
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
self.log.debug("previous_start_date was called")
prev_ti = self.get_previous_ti(state=state, session=session)
# prev_ti may not exist and prev_ti.start_date may be None.
return pendulum.instance(prev_ti.start_date) if prev_ti and prev_ti.start_date else None
return _get_previous_start_date(task_instance=self, state=state, session=session)

@property
def previous_start_date_success(self) -> pendulum.DateTime | None:
Expand Down
15 changes: 15 additions & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@ def get_previous_execution_date(

return _get_previous_execution_date(task_instance=self, state=state, session=session)

def get_previous_start_date(
self,
state: DagRunState | None = None,
session: Session | None = None,
) -> pendulum.DateTime | None:
"""
Return the execution date from property previous_ti_success.
:param state: If passed, it only take into account instances of a specific state.
:param session: SQLAlchemy ORM Session
"""
from airflow.models.taskinstance import _get_previous_start_date

return _get_previous_start_date(task_instance=self, state=state, session=session)

def email_alert(self, exception, task: BaseOperator) -> None:
"""
Send alert email with exception information.
Expand Down
35 changes: 34 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ def create_task_instance(dag_maker, create_dummy_dag):
Uses ``create_dummy_dag`` to create the dag structure.
"""
from airflow.operators.empty import EmptyOperator

def maker(
execution_date=None,
Expand All @@ -1091,14 +1092,46 @@ def maker(
run_type=None,
data_interval=None,
external_executor_id=None,
dag_id="dag",
task_id="op1",
task_display_name=None,
max_active_tis_per_dag=16,
max_active_tis_per_dagrun=None,
pool="default_pool",
executor_config=None,
trigger_rule="all_done",
on_success_callback=None,
on_execute_callback=None,
on_failure_callback=None,
on_retry_callback=None,
email=None,
map_index=-1,
**kwargs,
) -> TaskInstance:
if execution_date is None:
from airflow.utils import timezone

execution_date = timezone.utcnow()
_, task = create_dummy_dag(with_dagrun_type=None, **kwargs)
with dag_maker(dag_id, **kwargs):
op_kwargs = {}
from tests.test_utils.compat import AIRFLOW_V_2_9_PLUS

if AIRFLOW_V_2_9_PLUS:
op_kwargs["task_display_name"] = task_display_name
task = EmptyOperator(
task_id=task_id,
max_active_tis_per_dag=max_active_tis_per_dag,
max_active_tis_per_dagrun=max_active_tis_per_dagrun,
executor_config=executor_config or {},
on_success_callback=on_success_callback,
on_execute_callback=on_execute_callback,
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
email=email,
pool=pool,
trigger_rule=trigger_rule,
**op_kwargs,
)

dagrun_kwargs = {"execution_date": execution_date, "state": dagrun_state}
if run_id is not None:
Expand Down
Loading

0 comments on commit c6bdd1f

Please sign in to comment.