Skip to content

Commit

Permalink
fix(trigger): add next_kwargs to StartTriggerArgs
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Jun 21, 2024
1 parent 06eabdd commit b222b9a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,15 +1606,15 @@ def _defer_task(

if exception is not None:
trigger_row = Trigger.from_object(exception.trigger)
trigger_kwargs = exception.kwargs
next_method = exception.method_name
next_kwargs = exception.kwargs
timeout = exception.timeout
elif ti.task is not None and ti.task.start_trigger_args is not None:
trigger_row = Trigger(
classpath=ti.task.start_trigger_args.trigger_cls,
kwargs=ti.task.start_trigger_args.trigger_kwargs or {},
)
trigger_kwargs = ti.task.start_trigger_args.trigger_kwargs
next_kwargs = ti.task.start_trigger_args.next_kwargs
next_method = ti.task.start_trigger_args.next_method
timeout = ti.task.start_trigger_args.timeout
else:
Expand All @@ -1635,7 +1635,7 @@ def _defer_task(
ti.state = TaskInstanceState.DEFERRED
ti.trigger_id = trigger_row.id
ti.next_method = next_method
ti.next_kwargs = trigger_kwargs or {}
ti.next_kwargs = next_kwargs or {}

# Calculate timeout too if it was passed
if timeout is not None:
Expand Down
2 changes: 2 additions & 0 deletions airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ class StartTriggerArgs:
trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: timedelta | None = None

def serialize(self):
return {
"trigger_cls": self.trigger_cls,
"trigger_kwargs": self.trigger_kwargs,
"next_method": self.next_method,
"next_kwargs": self.next_kwargs,
"timeout": self.timeout,
}

Expand Down

0 comments on commit b222b9a

Please sign in to comment.