Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatabricksSQLOperator struggles with parsing the data #36839

Closed
1 of 2 tasks
aru-trackunit opened this issue Jan 17, 2024 · 6 comments · Fixed by #37168
Closed
1 of 2 tasks

DatabricksSQLOperator struggles with parsing the data #36839

aru-trackunit opened this issue Jan 17, 2024 · 6 comments · Fixed by #37168

Comments

@aru-trackunit
Copy link
Contributor

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

The error has not been present in version apache-airflow-providers-databricks==4.7.0
I upgraded to the latest and it is presentapache-airflow-providers-databricks==6.0.0

Apache Airflow version

2.8.0

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

e9b5cf0d8cb8
*** Found local files:
***   * /opt/airflow/logs/dag_id=dag_id/run_id=manual__2024-01-17T11:39:05+01:00/task_id=read/attempt=1.log
[2024-01-17, 11:39:11 CET] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [queued]>
[2024-01-17, 11:39:11 CET] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [queued]>
[2024-01-17, 11:39:11 CET] {taskinstance.py:2171} INFO - Starting attempt 1 of 1
[2024-01-17, 11:39:11 CET] {taskinstance.py:2192} INFO - Executing <Task(DatabricksSqlOperator): read> on 2024-01-17 10:39:05+00:00
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:60} INFO - Started process 3561 to run task
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'dag_id', 'read', 'manual__2024-01-17T11:39:05+01:00', '--job-id', '23', '--raw', '--subdir', 'DAGS_FOLDER/file_uploads/dag-wn-equipment.py', '--cfg-path', '/tmp/tmp49oxl6yk']
[2024-01-17, 11:39:11 CET] {standard_task_runner.py:88} INFO - Job 23: Subtask read
[2024-01-17, 11:39:11 CET] {task_command.py:423} INFO - Running <TaskInstance: dag_id.read manual__2024-01-17T11:39:05+01:00 [running]> on host e9b5cf0d8cb8
[2024-01-17, 11:39:11 CET] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics_foundation' AIRFLOW_CTX_DAG_ID='dag_id' AIRFLOW_CTX_TASK_ID='read' AIRFLOW_CTX_EXECUTION_DATE='2024-01-17T10:39:05+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-17T11:39:05+01:00'
[2024-01-17, 11:39:11 CET] {sql.py:276} INFO - Executing: SELECT * FROM catalog.schema.test_table LIMIT 10;
[2024-01-17, 11:39:11 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-17, 11:39:11 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-17, 11:39:11 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-17, 11:39:12 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-17, 11:39:12 CET] {client.py:200} INFO - Successfully opened session 01eeb524-XXXX-1b54-9b2e-b16859209198
[2024-01-17, 11:39:12 CET] {sql.py:450} INFO - Running statement: SELECT * FROM catalog.schema.test_table LIMIT 10, parameters: None
[2024-01-17, 11:39:12 CET] {client.py:258} INFO - Closing session 01eeb524-XXX-1b54-9b2e-b16859209198
[2024-01-17, 11:39:12 CET] {xcom.py:664} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
[2024-01-17, 11:39:12 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 147, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 180, in serialize
    raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 440, in _execute_task
    task_instance.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2981, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 247, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 662, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 93, in default
    return super().default(o)
  File "/usr/local/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type tuple is not JSON serializable
[2024-01-17, 11:39:12 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=dag_id, task_id=read, execution_date=20240117T103905, start_date=20240117T103911, end_date=20240117T103912
[2024-01-17, 11:39:12 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 23 for task read (Object of type tuple is not JSON serializable; 3561)
[2024-01-17, 11:39:12 CET] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-17, 11:39:12 CET] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check

I did a little of investigation and edited hooks/databricks_sql.py file and added prints in the _make_common_data_structure method:

result variable value:

[Row(currency_code='AUD', date_eom=datetime.date(2022, 11, 30), exchange_rate_avg=4.813328450816598, exchange_rate_closing=4.821134521880065, currency_date_code='AUD_2022-11-30'), Row(currency_code='BRL', date_eom=datetime.date(2018, 5, 31), exchange_rate_avg=1.7359416398794039, exchange_rate_closing=1.706230229679549, currency_date_code='BRL_2018-05-31'), Row(currency_code='CHF', date_eom=datetime.date(2018, 11, 30), exchange_rate_avg=6.558196767265858, exchange_rate_closing=6.580423280423281, currency_date_code='CHF_2018-11-30'), Row(currency_code='CHF', date_eom=datetime.date(2021, 2, 28), exchange_rate_avg=6.849691568142963, exchange_rate_closing=6.768705625341343, currency_date_code='CHF_2021-02-28'), Row(currency_code='CNY', date_eom=datetime.date(2008, 8, 31), exchange_rate_avg=0.7273119209984404, exchange_rate_closing=0.7405127390432313, currency_date_code='CNY_2008-08-31'), Row(currency_code='CYP', date_eom=datetime.date(2001, 1, 31), exchange_rate_avg=12.92422064597834, exchange_rate_closing=12.891303972216942, currency_date_code='CYP_2001-01-31'), Row(currency_code='CYP', date_eom=datetime.date(2001, 10, 31), exchange_rate_avg=12.94928151413741, exchange_rate_closing=12.961950370047887, currency_date_code='CYP_2001-10-31'), Row(currency_code='CZK', date_eom=datetime.date(2010, 6, 30), exchange_rate_avg=0.28863360451335074, exchange_rate_closing=0.289938110622397, currency_date_code='CZK_2010-06-30'), Row(currency_code='CZK', date_eom=datetime.date(2010, 12, 31), exchange_rate_avg=0.2960584828246455, exchange_rate_closing=0.29741430908583055, currency_date_code='CZK_2010-12-31'), Row(currency_code='DKK', date_eom=datetime.date(2017, 5, 31), exchange_rate_avg=1.0, exchange_rate_closing=1.0, currency_date_code='DKK_2017-05-31')]

rows_fields var value:

row_fields <Row('currency_code', 'date_eom', 'exchange_rate_avg', 'exchange_rate_closing', 'currency_date_code')>

What you think should happen instead

No response

How to reproduce

test2 = DatabricksSqlOperator(
        task_id="read",
        databricks_conn_id="tu-databricks-sp",
        sql_endpoint_name="sql_endpoint_name",
        sql=f"""SELECT * FROM catalog.schema.test_table LIMIT 10;"""
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Joffreybvn
Copy link
Contributor

Joffreybvn commented Jan 22, 2024

Indeed, a Databricks.Row.__fields__ is itself another Row object when the row is created in two steps. Fix for that is also coming in #36949
Thanks for the debug print !

@aru-trackunit
Copy link
Contributor Author

@Joffreybvn Just retested with Databricks provider 6.1.0 on airflow 2.8.1 and the issue still persists with different stacktrace:

[2024-01-29, 13:41:53 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: task_1.read manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:53 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: task_1.read manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:53 CET] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-01-29, 13:41:53 CET] {taskinstance.py:2191} INFO - Executing <Task(DatabricksSqlOperator): read> on 2024-01-29 12:41:35+00:00
[2024-01-29, 13:41:53 CET] {standard_task_runner.py:60} INFO - Started process 131 to run task
[2024-01-29, 13:41:53 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'task_1', 'read', 'manual__2024-01-29T13:41:35+01:00', '--job-id', '6', '--raw', '--subdir', 'DAGS_FOLDER/dag-wn-equipment.py', '--cfg-path', '/tmp/tmpq_30xj_j']
[2024-01-29, 13:41:53 CET] {standard_task_runner.py:88} INFO - Job 6: Subtask read
[2024-01-29, 13:41:53 CET] {task_command.py:423} INFO - Running <TaskInstance: task_1.read manual__2024-01-29T13:41:35+01:00 [running]> on host 33e1fb1e4ed5
[2024-01-29, 13:41:53 CET] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics' AIRFLOW_CTX_DAG_ID='task_1' AIRFLOW_CTX_TASK_ID='read' AIRFLOW_CTX_EXECUTION_DATE='2024-01-29T12:41:35+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-29T13:41:35+01:00'
[2024-01-29, 13:41:53 CET] {sql.py:276} INFO - Executing: SELECT * FROM catalog.schema.test_table LIMIT 10;
[2024-01-29, 13:41:53 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-29, 13:41:54 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:54 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-29, 13:41:54 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:55 CET] {client.py:200} INFO - Successfully opened session 01eebea3-c8a3-1606-b957-b38c0426a2d7
[2024-01-29, 13:41:55 CET] {sql.py:450} INFO - Running statement: SELECT * FROM catalog.schema.test_table LIMIT 10, parameters: None
[2024-01-29, 13:41:57 CET] {client.py:258} INFO - Closing session 01eebea3-c8a3-1606-b957-b38c0426a2d7
[2024-01-29, 13:41:57 CET] {xcom.py:664} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
[2024-01-29, 13:41:57 CET] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 147, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 126, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 180, in serialize
    raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 440, in _execute_task
    task_instance.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2980, in xcom_push
    XCom.set(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 247, in set
    value = cls.serialize_value(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 662, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 93, in default
    return super().default(o)
  File "/usr/local/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type tuple is not JSON serializable
[2024-01-29, 13:41:57 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=task_1, task_id=read, execution_date=20240129T124135, start_date=20240129T124153, end_date=20240129T124157
[2024-01-29, 13:41:57 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 6 for task read (Object of type tuple is not JSON serializable; 131)

@Joffreybvn
Copy link
Contributor

Joffreybvn commented Jan 29, 2024

I could not reproduce your error. But I found another bug where a single namedtuple 'Row' fails to be serialized, while a list of those namedtuples works fine. Is your query returning only one result?

@odykstra
Copy link

odykstra commented Feb 1, 2024

Hi there. I'm running into this same issue. I'm running a merge statement that only returns one row with four columns.

Appreciate your attention to this.

Old versions: no error
apache-airflow-providers-databricks -- 5.0.1
Airflow -- v2.5.0

New versions; getting error
apache-airflow-providers-databricks -- 6.1.0
Airflow -- v2.7.3

[2024-02-01, 15:28:06 UTC] {client.py:258} INFO - Closing session 01eec116-6bad-1ab1-9d48-941cb79ab654
[2024-02-01, 15:28:07 UTC] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
[2024-02-01, 15:28:07 UTC] {base.py:73} INFO - Using connection ID 'databricks' for task execution.
[2024-02-01, 15:28:07 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/json.py", line 91, in default
    return serialize(o)
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 145, in serialize
    return encode(classname, version, serialize(data, depth + 1))
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 124, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 124, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 124, in serialize
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 124, in <listcomp>
    return [serialize(d, depth + 1) for d in o]
  File "/usr/local/lib/python3.10/site-packages/airflow/serialization/serde.py", line 178, in serialize
    raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class 'airflow.providers.databricks.hooks.databricks_sql.Row'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
    XCom.set(
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/airflow/models/xcom.py", line 244, in set
    value = cls.serialize_value(
  File "/usr/local/lib/python3.10/site-packages/airflow/models/xcom.py", line 659, in serialize_value
    return json.dumps(value, cls=XComEncoder).encode("UTF-8")
  File "/usr/local/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/json.py", line 102, in encode
    o = self.default(o)
  File "/usr/local/lib/python3.10/site-packages/airflow/utils/json.py", line 93, in default
    return super().default(o)
  File "/usr/local/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type tuple is not JSON serializable
[2024-02-01, 15:28:07 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=Aggregates_Hit, task_id=update_tm_hit_perc, execution_date=20240201T152717, start_date=20240201T152732, end_date=20240201T152807
[2024-02-01, 15:28:07 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 2054213 for task update_tm_hit_perc (Object of type tuple is not JSON serializable; 31)
[2024-02-01, 15:28:07 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-02-01, 15:28:07 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

@avivshafir
Copy link

@odykstra any resolution on this issue?
I'm also getting this error

@odykstra
Copy link

@odykstra any resolution on this issue? I'm also getting this error

@avivshafir The newest versions should have fixed this issue. So check your versions and upgrade if possible. The other choice is to make your databricks provider version explicit in the requirements.txt file.

I have my version held like so:
apache-airflow-providers-databricks==6.6.0

but I know anything in version 5 does not have the issue also.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants