Skip to content

Commit

Permalink
Only allow passing JSON Serializable conf to TriggerDagRunOperator (#…
Browse files Browse the repository at this point in the history
…13964)

closes #13414

(cherry picked from commit b4885b2)
  • Loading branch information
kaxil committed Jan 29, 2021
1 parent 7b02edd commit d0a42c1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import datetime
import json
import time
from typing import Dict, List, Optional, Union

Expand Down Expand Up @@ -108,6 +109,11 @@ def __init__(

self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore

try:
json.dumps(self.conf)
except TypeError:
raise AirflowException("conf parameter should be JSON Serializable")

def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
execution_date = self.execution_date
Expand Down
11 changes: 11 additions & 0 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ def test_trigger_dagrun_operator_conf(self):
assert len(dagruns) == 1
assert dagruns[0].conf, {"foo": "bar"}

def test_trigger_dagrun_operator_templated_invalid_conf(self):
"""Test passing a conf that is not JSON Serializable raise error."""

with pytest.raises(AirflowException, match="^conf parameter should be JSON Serializable$"):
TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_invalid_conf",
trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "{{ dag.dag_id }}", "datetime": timezone.utcnow()},
dag=self.dag,
)

def test_trigger_dagrun_operator_templated_conf(self):
"""Test passing a templated conf to the triggered DagRun."""
task = TriggerDagRunOperator(
Expand Down

0 comments on commit d0a42c1

Please sign in to comment.