Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] fastparquet_compatibility_test fails on dataproc #9603

Closed
jlowe opened this issue Nov 1, 2023 · 2 comments · Fixed by #9607
Closed

[BUG] fastparquet_compatibility_test fails on dataproc #9603

jlowe opened this issue Nov 1, 2023 · 2 comments · Fixed by #9607
Assignees
Labels
bug Something isn't working test Only impacts tests

Comments

@jlowe
Copy link
Member

jlowe commented Nov 1, 2023

fastparquet_compatibility_test failed in a recent nightly Dataproc build:

[2023-11-01T16:49:29.309Z] FAILED ../../src/main/python/fastparquet_compatibility_test.py::test_reading_file_rewritten_with_fastparquet[Date(not_null)-int961] - IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-i...
[2023-11-01T16:49:29.309Z] FAILED ../../src/main/python/fastparquet_compatibility_test.py::test_reading_file_rewritten_with_fastparquet[Date-int96][INJECT_OOM] - IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-i...
[2023-11-01T16:49:29.309Z] FAILED ../../src/main/python/fastparquet_compatibility_test.py::test_reading_file_rewritten_with_fastparquet[Timestamp(not_null)-int960] - IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-i...
[2023-11-01T16:49:29.309Z] FAILED ../../src/main/python/fastparquet_compatibility_test.py::test_reading_file_rewritten_with_fastparquet[Timestamp-int96] - IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-i...\
[2023-11-01T16:49:29.309Z] FAILED ../../src/main/python/fastparquet_compatibility_test.py::test_reading_file_rewritten_with_fastparquet[Struct(not_null)(('first', Integer(not_null)))-int96][INJECT_OOM] - IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-i...
Details for one of the test failures
[2023-11-01T16:49:29.050Z] =================================== FAILURES ===================================
[2023-11-01T16:49:29.050Z] _____ test_reading_file_rewritten_with_fastparquet[Date(not_null)-int961] ______
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] self = <[AttributeError("'ParquetFile' object has no attribute '_schema'") raised in repr()] ParquetFile object at 0x7fdf20583d00>
[2023-11-01T16:49:29.050Z] fn = '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/_local/FASTPARQUET_WRITE_PATH_tmp'
[2023-11-01T16:49:29.050Z] verify = False, open_with = <function default_open at 0x7fdf309460d0>
[2023-11-01T16:49:29.050Z] root = False, sep = None
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z]     def __init__(self, fn, verify=False, open_with=default_open,
[2023-11-01T16:49:29.050Z]                  root=False, sep=None):
[2023-11-01T16:49:29.050Z]         if isinstance(fn, (tuple, list)):
[2023-11-01T16:49:29.050Z]             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
[2023-11-01T16:49:29.050Z]                                                open_with=open_with, root=root)
[2023-11-01T16:49:29.050Z]             if basepath:
[2023-11-01T16:49:29.050Z]                 self.fn = join_path(basepath, '_metadata')  # effective file
[2023-11-01T16:49:29.050Z]             else:
[2023-11-01T16:49:29.050Z]                 self.fn = '_metadata'
[2023-11-01T16:49:29.050Z]             self.fmd = fmd
[2023-11-01T16:49:29.050Z]             self._set_attrs()
[2023-11-01T16:49:29.050Z]         elif hasattr(fn, 'read'):
[2023-11-01T16:49:29.050Z]             # file-like
[2023-11-01T16:49:29.050Z]             self._parse_header(fn, verify)
[2023-11-01T16:49:29.050Z]             if self.file_scheme not in ['simple', 'empty']:
[2023-11-01T16:49:29.050Z]                 raise ValueError('Cannot use file-like input '
[2023-11-01T16:49:29.050Z]                                  'with multi-file data')
[2023-11-01T16:49:29.050Z]             open_with = lambda *args, **kwargs: fn
[2023-11-01T16:49:29.050Z]             self.fn = None
[2023-11-01T16:49:29.050Z]         else:
[2023-11-01T16:49:29.050Z]             try:
[2023-11-01T16:49:29.050Z]                 fn2 = join_path(fn, '_metadata')
[2023-11-01T16:49:29.050Z]                 self.fn = fn2
[2023-11-01T16:49:29.050Z] >               with open_with(fn2, 'rb') as f:
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] /opt/conda/default/lib/python3.8/site-packages/fastparquet/api.py:110: 
[2023-11-01T16:49:29.050Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] f = '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/_local/FASTPARQUET_WRITE_PATH_tmp/_metadata'
[2023-11-01T16:49:29.050Z] mode = 'rb'
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z]     def default_open(f, mode='rb'):
[2023-11-01T16:49:29.050Z] >       return open(f, mode)
[2023-11-01T16:49:29.050Z] E       FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/_local/FASTPARQUET_WRITE_PATH_tmp/_metadata'
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] /opt/conda/default/lib/python3.8/site-packages/fastparquet/util.py:39: FileNotFoundError
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] During handling of the above exception, another exception occurred:
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z] column_gen = Date(not_null), time_format = 'int96'
[2023-11-01T16:49:29.050Z] spark_tmp_path = '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/'
[2023-11-01T16:49:29.050Z] 
[2023-11-01T16:49:29.050Z]     @pytest.mark.skipif(condition=fastparquet_unavailable(),
[2023-11-01T16:49:29.050Z]                         reason="fastparquet is required for testing fastparquet compatibility")
[2023-11-01T16:49:29.050Z]     @pytest.mark.parametrize('column_gen, time_format', [
[2023-11-01T16:49:29.050Z]         pytest.param(
[2023-11-01T16:49:29.050Z]             DateGen(nullable=False,
[2023-11-01T16:49:29.050Z]                      start=pandas_min_date,
[2023-11-01T16:49:29.050Z]                      end=pandas_max_date), 'int64',
[2023-11-01T16:49:29.050Z]             marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading dates written via "
[2023-11-01T16:49:29.050Z]                                            "fastparquet, if written in int64: "
[2023-11-01T16:49:29.050Z]                                            "\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")),
[2023-11-01T16:49:29.050Z]         pytest.param(
[2023-11-01T16:49:29.050Z]             DateGen(nullable=False), 'int96',
[2023-11-01T16:49:29.050Z]             marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before "
[2023-11-01T16:49:29.050Z]                                            "1582-10-15 or timestamps before 1900-01-01T00:00:00Z. "
[2023-11-01T16:49:29.050Z]                                            "This messes up reads from Apache Spark and the plugin.")),
[2023-11-01T16:49:29.050Z]         (DateGen(nullable=False,
[2023-11-01T16:49:29.050Z]                  start=date(year=2000, month=1, day=1),
[2023-11-01T16:49:29.050Z]                  end=date(year=2020, month=12, day=31)), 'int96'),
[2023-11-01T16:49:29.050Z]         (DateGen(nullable=True,
[2023-11-01T16:49:29.050Z]                 start=date(year=2000, month=1, day=1),
[2023-11-01T16:49:29.050Z]                 end=date(year=2020, month=12, day=31)), 'int96'),
[2023-11-01T16:49:29.050Z]         pytest.param(
[2023-11-01T16:49:29.050Z]             TimestampGen(nullable=False,
[2023-11-01T16:49:29.050Z]                 start=datetime(2000, 1, 1, tzinfo=timezone.utc),
[2023-11-01T16:49:29.050Z]                 end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int64',
[2023-11-01T16:49:29.050Z]             marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading timestamps written via "
[2023-11-01T16:49:29.050Z]                                            "fastparquet, if written in int64: "
[2023-11-01T16:49:29.050Z]                                            "\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")),
[2023-11-01T16:49:29.050Z]         (TimestampGen(nullable=False,
[2023-11-01T16:49:29.050Z]                      start=datetime(2000, 1, 1, tzinfo=timezone.utc),
[2023-11-01T16:49:29.050Z]                      end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'),
[2023-11-01T16:49:29.050Z]         (TimestampGen(nullable=True,
[2023-11-01T16:49:29.050Z]                       start=datetime(2000, 1, 1, tzinfo=timezone.utc),
[2023-11-01T16:49:29.050Z]                       end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'),
[2023-11-01T16:49:29.050Z]         pytest.param(
[2023-11-01T16:49:29.050Z]             TimestampGen(nullable=False), 'int96',
[2023-11-01T16:49:29.050Z]             marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before "
[2023-11-01T16:49:29.051Z]                                            "1582-10-15 or timestamps before 1900-01-01T00:00:00Z. "
[2023-11-01T16:49:29.051Z]                                            "This messes up reads from Apache Spark and the plugin.")),
[2023-11-01T16:49:29.051Z]         pytest.param(
[2023-11-01T16:49:29.051Z]             ArrayGen(nullable=False, child_gen=IntegerGen(nullable=False)), 'int96',
[2023-11-01T16:49:29.051Z]             marks=pytest.mark.xfail(reason="fastparquet fails to serialize array elements with any available encoding. "
[2023-11-01T16:49:29.051Z]                                            "E.g. \"Error converting column 'a' to bytes using encoding JSON. "
[2023-11-01T16:49:29.051Z]                                            "Original error: Object of type int32 is not JSON serializable\".")),
[2023-11-01T16:49:29.051Z]         (StructGen(nullable=False, children=[('first', IntegerGen(nullable=False))]), 'int96'),
[2023-11-01T16:49:29.051Z]         pytest.param(
[2023-11-01T16:49:29.051Z]             StructGen(nullable=True, children=[('first', IntegerGen(nullable=False))]), 'int96',
[2023-11-01T16:49:29.051Z]             marks=pytest.mark.xfail(reason="fastparquet fails to read nullable Struct columns written from Apache Spark. "
[2023-11-01T16:49:29.051Z]                                            "It fails the rewrite to parquet, thereby failing the test.")),
[2023-11-01T16:49:29.051Z]     ], ids=idfn)
[2023-11-01T16:49:29.051Z]     def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_tmp_path):
[2023-11-01T16:49:29.051Z]         """
[2023-11-01T16:49:29.051Z]         This test is a workaround to test data-types that have problems being converted
[2023-11-01T16:49:29.051Z]         from Spark dataframes to Pandas dataframes.
[2023-11-01T16:49:29.051Z]         For instance, sparkDF.toPandas() incorrectly converts ARRAY<INT> columns into
[2023-11-01T16:49:29.051Z]         STRING columns.
[2023-11-01T16:49:29.051Z]         This test writes the Spark dataframe into a temporary file, and then uses
[2023-11-01T16:49:29.051Z]         `fastparquet` to read and write the file again, to the final destination.
[2023-11-01T16:49:29.051Z]         The final file should be in the correct format, with the right datatypes.
[2023-11-01T16:49:29.051Z]         This is then checked for read-accuracy, via CPU and GPU.
[2023-11-01T16:49:29.051Z]         """
[2023-11-01T16:49:29.051Z]         suffix = "/FASTPARQUET_WRITE_PATH"
[2023-11-01T16:49:29.051Z]         hdfs_data_path = spark_tmp_path + suffix
[2023-11-01T16:49:29.051Z]         local_base_path = (spark_tmp_path + "_local")
[2023-11-01T16:49:29.051Z]         local_data_path = local_base_path + suffix
[2023-11-01T16:49:29.051Z]     
[2023-11-01T16:49:29.051Z]         def rewrite_with_fastparquet(spark, data_gen):
[2023-11-01T16:49:29.051Z]             """
[2023-11-01T16:49:29.051Z]             This helper function (eventually) writes data generated from `data_gen` to the local filesystem,
[2023-11-01T16:49:29.051Z]             via fastparquet.
[2023-11-01T16:49:29.051Z]             To preserve data types from data_gen, the writes are done first through Spark, thus:
[2023-11-01T16:49:29.051Z]               1. Write to HDFS with Spark. (This preserves data types.)
[2023-11-01T16:49:29.051Z]               2. Copy the data to local FS (so that fastparquet can read it).
[2023-11-01T16:49:29.051Z]               3. Read data with fastparquet, write it back out to local FS with fastparquet.
[2023-11-01T16:49:29.051Z]               4. Copy the fastparquet output back to HDFS, for reads with Spark.
[2023-11-01T16:49:29.051Z]             """
[2023-11-01T16:49:29.051Z]             import fastparquet
[2023-11-01T16:49:29.051Z]             hdfs_tmp_data_path = hdfs_data_path + "_tmp"
[2023-11-01T16:49:29.051Z]             spark_df = gen_df(spark, data_gen, 2048)
[2023-11-01T16:49:29.051Z]             # 1. Write to HDFS with Spark.
[2023-11-01T16:49:29.051Z]             spark_df.repartition(1).write.mode("overwrite").parquet(hdfs_tmp_data_path)
[2023-11-01T16:49:29.051Z]             # Make local base directory.
[2023-11-01T16:49:29.051Z]             os.makedirs(name=local_base_path, exist_ok=True)
[2023-11-01T16:49:29.051Z]             # 2. Copy Spark-written data to local filesystem, for read with Parquet.
[2023-11-01T16:49:29.051Z]             local_tmp_data_path = local_data_path + "_tmp"
[2023-11-01T16:49:29.051Z]             copy_to_local(spark=spark, hdfs_source=hdfs_tmp_data_path, local_target=local_tmp_data_path)
[2023-11-01T16:49:29.051Z]             # 3. Read local tmp data with fastparquet, rewrite to final local path, with fastparquet.
[2023-11-01T16:49:29.051Z]             pandas_df = fastparquet.ParquetFile(local_tmp_data_path).to_pandas()
[2023-11-01T16:49:29.051Z]             fastparquet.write(local_data_path, pandas_df, times=time_format)
[2023-11-01T16:49:29.051Z]             # 4. Copy fastparquet-written data back to HDFS, so that Spark can read it.
[2023-11-01T16:49:29.051Z]             copy_from_local(spark=spark, local_source=local_data_path, hdfs_target=hdfs_data_path)
[2023-11-01T16:49:29.051Z]             # Local data can now be cleaned up.
[2023-11-01T16:49:29.051Z]             delete_local_directory(local_base_path)
[2023-11-01T16:49:29.051Z]     
[2023-11-01T16:49:29.051Z]         gen = StructGen([('a', column_gen),
[2023-11-01T16:49:29.051Z]                          ('part', IntegerGen(nullable=False))], nullable=False)
[2023-11-01T16:49:29.051Z]         # Write data with CPU session.
[2023-11-01T16:49:29.051Z] >       with_cpu_session(
[2023-11-01T16:49:29.051Z]             lambda spark: rewrite_with_fastparquet(spark, gen)
[2023-11-01T16:49:29.051Z]         )
[2023-11-01T16:49:29.051Z] 
[2023-11-01T16:49:29.051Z] ../../src/main/python/fastparquet_compatibility_test.py:428: 
[2023-11-01T16:49:29.051Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2023-11-01T16:49:29.051Z] ../../src/main/python/spark_session.py:116: in with_cpu_session
[2023-11-01T16:49:29.051Z]     return with_spark_session(func, conf=copy)
[2023-11-01T16:49:29.051Z] ../../src/main/python/spark_session.py:100: in with_spark_session
[2023-11-01T16:49:29.051Z]     ret = func(_spark)
[2023-11-01T16:49:29.051Z] ../../src/main/python/fastparquet_compatibility_test.py:429: in <lambda>
[2023-11-01T16:49:29.051Z]     lambda spark: rewrite_with_fastparquet(spark, gen)
[2023-11-01T16:49:29.051Z] ../../src/main/python/fastparquet_compatibility_test.py:418: in rewrite_with_fastparquet
[2023-11-01T16:49:29.051Z]     pandas_df = fastparquet.ParquetFile(local_tmp_data_path).to_pandas()
[2023-11-01T16:49:29.051Z] /opt/conda/default/lib/python3.8/site-packages/fastparquet/api.py:115: in __init__
[2023-11-01T16:49:29.051Z]     with open_with(fn, 'rb') as f:
[2023-11-01T16:49:29.051Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2023-11-01T16:49:29.051Z] 
[2023-11-01T16:49:29.051Z] f = '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/_local/FASTPARQUET_WRITE_PATH_tmp'
[2023-11-01T16:49:29.051Z] mode = 'rb'
[2023-11-01T16:49:29.051Z] 
[2023-11-01T16:49:29.051Z]     def default_open(f, mode='rb'):
[2023-11-01T16:49:29.051Z] >       return open(f, mode)
[2023-11-01T16:49:29.051Z] E       IsADirectoryError: [Errno 21] Is a directory: '/tmp/pyspark_tests//rapids-it-dataproc-20-ubuntu18-322-m-main-11369-1843522649/_local/FASTPARQUET_WRITE_PATH_tmp'
[2023-11-01T16:49:29.051Z] 
[2023-11-01T16:49:29.051Z] /opt/conda/default/lib/python3.8/site-packages/fastparquet/util.py:39: IsADirectoryError
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify labels Nov 1, 2023
@mythrocks mythrocks self-assigned this Nov 1, 2023
@mythrocks
Copy link
Collaborator

mythrocks commented Nov 2, 2023

Hmm. This is odd. I just tested the fastparquet tests manually against Spark 3.3.2, and the tests appear to run correctly:

fastparquet-test-m:~/work/spark-rapids$ ./integration_tests/run_pyspark_from_build.sh -k fastparquet
...
== 19 passed, 29 skipped, 21944 deselected, 12 xfailed, 51 warnings in 37.86s ==

I wonder if this is version-specific. I'll try again with an older version of Dataproc, to match Spark version 3.1.3, as in the reported failure.

Edit: Appears to be version specific. I have a repro on 3.1.3. More as it develops.

@mythrocks mythrocks added test Only impacts tests and removed ? - Needs Triage Need team to review and classify labels Nov 2, 2023
mythrocks added a commit to mythrocks/spark-rapids that referenced this issue Nov 2, 2023
Fixes NVIDIA#9603.

This commit changes the integration test setup to specifically install
fastparquet-0.8.3.

Prior to this change, when the fastparquet version is not specified, the pip
install caused 0.5.0 to be installed on some nodes, e.g. on Dataproc 2.0
(with Spark 3.1.1).
The older fastparquet versions do not support reading the contents of input
directories recursively, causing the tests to fail.

Signed-off-by: MithunR <[email protected]>
mythrocks added a commit to mythrocks/spark-rapids that referenced this issue Nov 2, 2023
Fixes NVIDIA#9603.

This commit changes the integration test setup to specifically install
fastparquet-0.8.3.

Prior to this change, when the fastparquet version is not specified, the pip
install caused 0.5.0 to be installed on some nodes, e.g. on Dataproc 2.0
(with Spark 3.1.1).
The older fastparquet versions do not support reading the contents of input
directories recursively, causing the tests to fail.

Note that this change doesn't bump the version all the way to 2023.8.0, so as
to preserve compatibility with Dataproc 2.0.  v0.8.3 seems to have the broadest
support.

Signed-off-by: MithunR <[email protected]>
@mythrocks
Copy link
Collaborator

The problem is the fastparquet version. On Dataproc 2.0, when one does pip install fastparquet, version 0.5.0 is installed. This version has no support for recursively reading directory contents.

There is a fix posted in #9607.

@pxLi pxLi closed this as completed in #9607 Nov 3, 2023
pxLi pushed a commit that referenced this issue Nov 3, 2023
…#9607)

* Integration tests: Install specific fastparquet version.

Fixes #9603.

This commit changes the integration test setup to specifically install
fastparquet-0.8.3.

Prior to this change, when the fastparquet version is not specified, the pip
install caused 0.5.0 to be installed on some nodes, e.g. on Dataproc 2.0
(with Spark 3.1.1).
The older fastparquet versions do not support reading the contents of input
directories recursively, causing the tests to fail.

Note that this change doesn't bump the version all the way to 2023.8.0, so as
to preserve compatibility with Dataproc 2.0.  v0.8.3 seems to have the broadest
support.

Signed-off-by: MithunR <[email protected]>

* Switching to one specific fastparquet version.

* Added requirements.txt to the IT tar.gz package.

This should allow the installation of the right `fastparquet` version.

---------

Signed-off-by: MithunR <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working test Only impacts tests
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants