Skip to content

Commit

Permalink
Add XCom.get_one() method back (apache#9580)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Jun 30, 2020
1 parent af14fb2 commit d0e010f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 4 deletions.
4 changes: 0 additions & 4 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,6 @@ with redirect_stdout(StreamLogWriter(logger, logging.INFO)), \
print("I Love Airflow")
```

### Removal of XCom.get_one()

This one is superseded by `XCom.get_many().first()` which will return the same result.

### Changes to SQLSensor

SQLSensor now consistent with python `bool()` function and the `allow_null` parameter has been removed.
Expand Down
41 changes: 41 additions & 0 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,47 @@ def set(

session.commit()

@classmethod
@provide_session
def get_one(cls,
execution_date: pendulum.DateTime,
key: Optional[str] = None,
task_id: Optional[Union[str, Iterable[str]]] = None,
dag_id: Optional[Union[str, Iterable[str]]] = None,
include_prior_dates: bool = False,
session: Session = None) -> Optional[Any]:
"""
Retrieve an XCom value, optionally meeting certain criteria. Returns None
of there are no results.
:param execution_date: Execution date for the task
:type execution_date: pendulum.datetime
:param key: A key for the XCom. If provided, only XComs with matching
keys will be returned. To remove the filter, pass key=None.
:type key: str
:param task_id: Only XComs from task with matching id will be
pulled. Can pass None to remove the filter.
:type task_id: str
:param dag_id: If provided, only pulls XCom from this DAG.
If None (default), the DAG of the calling task is used.
:type dag_id: str
:param include_prior_dates: If False, only XCom from the current
execution_date are returned. If True, XCom from previous dates
are returned as well.
:type include_prior_dates: bool
:param session: database session
:type session: sqlalchemy.orm.session.Session
"""
result = cls.get_many(execution_date=execution_date,
key=key,
task_ids=task_id,
dag_ids=dag_id,
include_prior_dates=include_prior_dates,
session=session).first()
if result:
return result.value
return None

@classmethod
@provide_session
def get_many(cls,
Expand Down
56 changes: 56 additions & 0 deletions tests/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,34 @@ def test_xcom_disable_pickle_type(self):

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "enable_xcom_pickling"): "False"})
def test_xcom_get_one_disable_pickle_type(self):
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
key = "xcom_test1"
dag_id = "test_dag1"
task_id = "test_task1"
XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
task_id=task_id,
execution_date=execution_date)

ret_value = XCom.get_one(key=key,
dag_id=dag_id,
task_id=task_id,
execution_date=execution_date)

self.assertEqual(ret_value, json_obj)

session = settings.Session()
ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
XCom.task_id == task_id,
XCom.execution_date == execution_date
).first().value

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_xcom_enable_pickle_type(self):
json_obj = {"key": "value"}
Expand Down Expand Up @@ -293,6 +321,34 @@ def test_xcom_enable_pickle_type(self):

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_xcom_get_one_enable_pickle_type(self):
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
key = "xcom_test3"
dag_id = "test_dag"
task_id = "test_task3"
XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
task_id=task_id,
execution_date=execution_date)

ret_value = XCom.get_one(key=key,
dag_id=dag_id,
task_id=task_id,
execution_date=execution_date)

self.assertEqual(ret_value, json_obj)

session = settings.Session()
ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
XCom.task_id == task_id,
XCom.execution_date == execution_date
).first().value

self.assertEqual(ret_value, json_obj)

@conf_vars({("core", "xcom_enable_pickling"): "False"})
def test_xcom_disable_pickle_type_fail_on_non_json(self):
class PickleRce:
Expand Down

0 comments on commit d0e010f

Please sign in to comment.