Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQLtoGCSOperator unable to save parquet files with date/datetime fields #17538

Closed
cmsouza opened this issue Aug 10, 2021 · 6 comments
Closed
Labels
area:providers kind:bug This is a clearly a bug pending-response provider:google Google (including GCP) related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@cmsouza
Copy link

cmsouza commented Aug 10, 2021

Apache Airflow version: 2.1.2

Apache Airflow Provider versions
apache-airflow-providers-celery==2.0.0
apache-airflow-providers-ftp==2.0.0
apache-airflow-providers-google==4.0.0
apache-airflow-providers-imap==2.0.0
apache-airflow-providers-mysql==2.0.0
apache-airflow-providers-postgres==2.0.0
apache-airflow-providers-sqlite==2.0.0

What happened:

When trying to export a parquet file to GCS from MySQL containing dates / datetimes the following error occurs:

[2021-08-09 14:30:36,760] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 154, in execute
    files_to_upload = self._write_local_data_files(cursor)
  File "/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 223, in _write_local_data_files
    tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
  File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_pydict
  File "pyarrow/array.pxi", line 331, in pyarrow.lib.asarray
  File "pyarrow/array.pxi", line 305, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.pyarrow_internal_check_status
TypeError: an integer is required (got type str)

Which is a result of the following code casting the data to string before storing the row:

# airflow/providers/google/cloud/transfers/sql_to_gcs.py@210
    for row in cursor:
      # Convert datetime objects to utc seconds, and decimals to floats.
      # Convert binary type object to string encoded with base64.
      row = self.convert_types(schema, col_type_dict, row)

# which calls MySQLtoGCSOperator::convert_type
# airflow/providers/google/cloud/transfers/mysql_to_gcs.py@120
  if isinstance(value, datetime):
    value = str(value)
  elif isinstance(value, timedelta):
    value = str((datetime.min + value).time())

This is fine for string based formats such as CSV/JSON but pyarrow should receive the unconverted datetime object to be able to store the row correctly

I think the easiest fix is just to not convert the row before passing it to pyarrow, like this:

diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index 091c22f01..85cc3c98f 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -210,12 +210,12 @@ class BaseSQLToGCSOperator(BaseOperator):
         for row in cursor:
             # Convert datetime objects to utc seconds, and decimals to floats.
             # Convert binary type object to string encoded with base64.
-            row = self.convert_types(schema, col_type_dict, row)
+            str_row = self.convert_types(schema, col_type_dict, row)

             if self.export_format == 'csv':
                 if self.null_marker is not None:
-                    row = [value if value is not None else self.null_marker for value in row]
-                csv_writer.writerow(row)
+                    str_row = [value if value is not None else self.null_marker for value in str_row]
+                csv_writer.writerow(str_row)
             elif self.export_format == 'parquet':
                 if self.null_marker is not None:
                     row = [value if value is not None else self.null_marker for value in row]
@@ -223,7 +223,7 @@ class BaseSQLToGCSOperator(BaseOperator):
                 tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
                 parquet_writer.write_table(tbl)
             else:
-                row_dict = dict(zip(schema, row))
+                row_dict = dict(zip(schema, str_row))

                 tmp_file_handle.write(
                     json.dumps(row_dict, sort_keys=True, ensure_ascii=False).encode("utf-8")

Although I'm not sure if it will cause any regression errors

What you expected to happen:

The parquet file should be written to disk and uploaded to GCS

How to reproduce it:
The following code is able to reproduce it 100% of the times, it's a simplified version of the code on the actual Operator:
This fails for the exact same reason the Operator fails:

from datetime import datetime
import pyarrow.parquet as pq

f = open('test.parquet', 'w')
c.execute(a)

parquet_schema = pa.schema([
    ('id', pa.int64()),
    ('date_added', pa.timestamp('s')),
])
    
parquet_writer = pq.ParquetWriter(f.name, parquet_schema)

row_pydic = {
 'id': [26417],
 'date_added': ['2021-08-10 11:04:06'],
}

tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)

parquet_writer.close()

And returns the following error:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_377852/1020037443.py in <module>
     17 }
     18 
---> 19 tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
     20 parquet_writer.write_table(tbl)
     21 

~/export/env/lib/python3.9/site-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pydict()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in pyarrow.lib.asarray()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in pyarrow.lib.array()
~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in pyarrow.lib._sequence_to_array()
~/export/env/lib/python3.9/site-packages/pyarrow/error.pxi in pyarrow.lib.pyarrow_internal_check_status()
TypeError: an integer is required (got type str)

While this works:

from datetime import datetime
import pyarrow.parquet as pq

f = open('test.parquet', 'w')
c.execute(a)

parquet_schema = pa.schema([
    ('id', pa.int64()),
    ('date_added', pa.timestamp('s')),
])
    
parquet_writer = pq.ParquetWriter(f.name, parquet_schema)

row_pydic = {
 'id': [26417],
 'date_added': [datetime(2021, 8, 10, 11, 4, 6)],
}

tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
parquet_writer.write_table(tbl)

parquet_writer.close()
@cmsouza cmsouza added the kind:bug This is a clearly a bug label Aug 10, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 10, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@mik-laj
Copy link
Member

mik-laj commented Aug 11, 2021

Can you look at this PR? Is it related? #15026

@eladkal eladkal added area:providers pending-response provider:google Google (including GCP) related issues labels Aug 19, 2021
@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 19, 2021
@github-actions
Copy link

This issue has been closed because it has not received response from the issue author.

@caupetit-itf
Copy link

Hello, i still have the the same error.
From the issue 26248 it seems that i'm not the only one,
(using airflow 2.8.4 and apache-airflow-providers-google==10.16.0)

I can't save dates from parquet
Is it possible to re-open this issue ?

@PanJ
Copy link

PanJ commented Oct 1, 2024

@caupetit-itf I don't know why but this seemed to fix my issue

image

I have to create custom Operator that inherits MySQLToGCSOperator and override function convert_type then add the above line of code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug pending-response provider:google Google (including GCP) related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests

5 participants