Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatabricksSQLOperator struggles with parsing the data #36838

Closed
1 of 2 tasks
aru-trackunit opened this issue Jan 17, 2024 · 14 comments
Closed
1 of 2 tasks

DatabricksSQLOperator struggles with parsing the data #36838

aru-trackunit opened this issue Jan 17, 2024 · 14 comments
Labels
area:providers Can't Reproduce The problem cannot be reproduced good first issue kind:bug This is a clearly a bug pending-response provider:databricks stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@aru-trackunit
Copy link
Contributor

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

The error has not been present in version apache-airflow-providers-databricks==4.7.0
I upgraded to the latest and it is presentapache-airflow-providers-databricks==6.0.0

Apache Airflow version

2.8.0

Operating System

Debian GNU/Linux 11 (bullseye)

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

[2024-01-16, 18:54:30 CET] {client.py:200} INFO - Successfully opened session XXXXXX-4c73-1765-b68b-b96c52b08745
[2024-01-16, 18:54:30 CET] {sql.py:450} INFO - Running statement: Select count(*) FROM catalog.schema.table_test parameters: None
[2024-01-16, 18:54:30 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
    output = hook.run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 256, in run
    result = self._make_common_data_structure(raw_result)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 286, in _make_common_data_structure
    rows_object = namedtuple("Row", rows_fields)  # type: ignore[misc]
  File "/usr/local/lib/python3.10/collections/__init__.py", line 373, in namedtuple
    raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'count(1)'

For the investigation reasons I edited the ./hooks/databricks_sql.py file and added some prints:
in the method _make_common_data_structure
result var is a type list and value [Row(count(1)=9714)]
row_fields value: <Row('count(1)')>

What you think should happen instead

No response

How to reproduce

test = DatabricksSqlOperator(
        task_id="count_query",
        databricks_conn_id="databricks-sp",
        sql_endpoint_name="endpoint_name",
        sql="SELECT count(*) FROM catalog.schema.table_test;"
    )

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@aru-trackunit aru-trackunit added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 17, 2024
@aru-trackunit
Copy link
Contributor Author

There is also another ticket that is related to the similar code path:
#36839

@RNHTTR RNHTTR added provider:databricks and removed needs-triage label for new issues that we didn't triage yet labels Jan 20, 2024
@bolkedebruin
Copy link
Contributor

@Joffreybvn any idea?

@Joffreybvn
Copy link
Contributor

Joffreybvn commented Jan 22, 2024

A namedtuple uses the field name (count(1) in this case) to create attributes. But attributes names can't have special characters, like parenthesis. Two solutions:

  • Editing the field name to exclude those characters. You'll get something like count1 instead.
  • Going for tuple instead of namedtuple

I guess the second choice is too early. And we switched from tuple to namedtuple after previous discussions. Thus I'll open a PR with the first choice

@aru-trackunit
Copy link
Contributor Author

@Joffreybvn Just retested with Databricks provider 6.1.0 on airflow 2.8.1 and the issue still persists with different stacktrace:

[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [queued]>
[2024-01-29, 13:41:39 CET] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-01-29, 13:41:39 CET] {taskinstance.py:2191} INFO - Executing <Task(DatabricksSqlOperator): count> on 2024-01-29 12:41:35+00:00
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:60} INFO - Started process 118 to run task
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'task_1', 'count', 'manual__2024-01-29T13:41:35+01:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/dag-wn-equipment.py', '--cfg-path', '/tmp/tmpp4t8f52h']
[2024-01-29, 13:41:39 CET] {standard_task_runner.py:88} INFO - Job 5: Subtask count
[2024-01-29, 13:41:39 CET] {task_command.py:423} INFO - Running <TaskInstance: task_1.count manual__2024-01-29T13:41:35+01:00 [running]> on host 33e1fb1e4ed5
[2024-01-29, 13:41:39 CET] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='team_analytics' AIRFLOW_CTX_DAG_ID='task_1' AIRFLOW_CTX_TASK_ID='count' AIRFLOW_CTX_EXECUTION_DATE='2024-01-29T12:41:35+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-29T13:41:35+01:00'
[2024-01-29, 13:41:39 CET] {sql.py:276} INFO - Executing: SELECT count(*) FROM catalog.schema.test_table;
[2024-01-29, 13:41:39 CET] {base.py:83} INFO - Using connection ID 'tu-databricks-sp' for task execution.
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:39 CET] {databricks_base.py:223} INFO - Existing Service Principal token is expired, or going to expire soon. Refreshing...
[2024-01-29, 13:41:39 CET] {databricks_base.py:514} INFO - Using Service Principal Token.
[2024-01-29, 13:41:40 CET] {client.py:200} INFO - Successfully opened session 01eebea3-bfcf-14ed-8b50-a14cc9a61a35
[2024-01-29, 13:41:40 CET] {sql.py:450} INFO - Running statement: SELECT count(*) FROM catalog.schema.test_table, parameters: None
[2024-01-29, 13:41:52 CET] {taskinstance.py:2698} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/operators/sql.py", line 282, in execute
    output = hook.run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_sql.py", line 254, in run
    raw_result = handler(cur)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/hooks/sql.py", line 91, in fetch_all_handler
    return cursor.fetchall()
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 670, in fetchall
    return self.active_result_set.fetchall()
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 944, in fetchall
    return self._convert_arrow_table(self.fetchall_arrow())
  File "/home/airflow/.local/lib/python3.10/site-packages/databricks/sql/client.py", line 884, in _convert_arrow_table
    res = df.to_numpy(na_value=None)
  File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/frame.py", line 1981, in to_numpy
    result = self._mgr.as_array(dtype=dtype, copy=copy, na_value=na_value)
  File "/home/airflow/.local/lib/python3.10/site-packages/pandas/core/internals/managers.py", line 1702, in as_array
    arr[isna(arr)] = na_value
TypeError: int() argument must be a string, a bytes-like object or a real number, not 'NoneType'
[2024-01-29, 13:41:52 CET] {taskinstance.py:1138} INFO - Marking task as FAILED. dag_id=task_1, task_id=count, execution_date=20240129T124135, start_date=20240129T124139, end_date=20240129T124152
[2024-01-29, 13:41:52 CET] {standard_task_runner.py:107} ERROR - Failed to execute job 5 for task count (int() argument must be a string, a bytes-like object or a real number, not 'NoneType'; 118)

@Joffreybvn
Copy link
Contributor

I cannot reproduce this error. Could you give me more info about your data ? Maybe a screenshot (like below) of what you have on the databrick's UI ?

MicrosoftTeams-image

@amoralca16
Copy link

Using pandas-2.1.4 solved the issue for me (Airflow 2.8.1)

@w0ut0
Copy link
Contributor

w0ut0 commented Feb 21, 2024

We have the same issue on Airflow 2.7.1 and 2.8.1, and both on provider 6.1.0 and 6.2.0.
Upgrading pandas to 2.1.4 did not solve the issue for us.
Downgrading the provider to 5.0.1 however did solve it.
In 5.0.1, the result of a merge statement looks like this. Which fails to render (same DAG) on provider version 6.x
image
In the Databricks UI, the results look similar:
image

@aru-trackunit
Copy link
Contributor Author

aru-trackunit commented Feb 26, 2024

Retested with newly released airflow

  • airflow 2.8.2
  • databricks provider 6.2.0
  • pandas 2.2.1
    the issue still persists

@potiuk
Copy link
Member

potiuk commented Mar 9, 2024

@aru-trackunit - seems that @Joffreybvn cannot reproduce the issue - can you please provide a bit more information as he asked in #36838 (comment) ?

@aru-trackunit
Copy link
Contributor Author

aru-trackunit commented Mar 11, 2024

I discussed it with him on airflow's slack but it's a good idea to have a visibility over it in here as well.
I have attached a DatabricksSqlOperator that is ready to test against. SQL returns one row as attached on the screenshot
Screenshot 2024-03-11 at 16 35 57

test = DatabricksSqlOperator(
        task_id="count_query",
        databricks_conn_id="databricks-sp",
        sql_endpoint_name="endpoint_name",
        sql="SELECT count(*) FROM catalog.schema.table_test;"
    )

Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 26, 2024
@aru-trackunit
Copy link
Contributor Author

I think it's related to this issue databricks/databricks-sql-python#326

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 27, 2024
Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 12, 2024
Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Apr 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers Can't Reproduce The problem cannot be reproduced good first issue kind:bug This is a clearly a bug pending-response provider:databricks stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests

8 participants