Skip to content

Commit

Permalink
Add XCom.deserialize_value to Airflow 1.10.13 (#12328)
Browse files Browse the repository at this point in the history
closes #11988
  • Loading branch information
kaxil authored and potiuk committed Nov 13, 2020
1 parent 91a1305 commit 1f3ce62
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,13 @@ class BaseXCom(Base, LoggingMixin):
"""
@reconstructor
def init_on_load(self):
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
try:
self.value = self.deserialize_value(self)
except (UnicodeEncodeError, ValueError):
# For backward-compatibility.
# Preventing errors in webserver
# due to XComs mixed with pickled and unpickled.
self.value = pickle.loads(self.value)
else:
try:
self.value = json.loads(self.value.decode('UTF-8'))
except (UnicodeEncodeError, ValueError):
# For backward-compatibility.
# Preventing errors in webserver
# due to XComs mixed with pickled and unpickled.
self.value = pickle.loads(self.value)

def __repr__(self):
return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
Expand Down Expand Up @@ -233,6 +229,23 @@ def serialize_value(value):
"support for XCOM in your airflow config.")
raise

@staticmethod
def deserialize_value(result):
# TODO: "pickling" has been deprecated and JSON is preferred.
# "pickling" will be removed in Airflow 2.0.
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
return pickle.loads(result.value)

try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
log.error("Could not deserialize the XCOM value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config.")
raise


def resolve_xcom_backend():
"""Resolves custom XCom class"""
Expand Down

0 comments on commit 1f3ce62

Please sign in to comment.