Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when reading data from Delta Lake table on S3 #2292

Closed
Sanjay-M opened this issue May 22, 2024 · 6 comments · Fixed by #2344
Closed

Error when reading data from Delta Lake table on S3 #2292

Sanjay-M opened this issue May 22, 2024 · 6 comments · Fixed by #2344
Assignees

Comments

@Sanjay-M
Copy link

Describe the bug
Error while reading the data on Delta Lake on S3 with daft and it is not able to generate a physical plan.

To Reproduce
Steps to reproduce the behavior:

  1. Write data to S3 using the Pyspark Delta Lake library with the S3A URI scheme
  2. Read approx more than 5GB of data from delta lake table using Daft
  3. Use delta_lake_read API and try collect() or to_pandas()

Expected behavior
Expect it to convert the data frame to pandas or materialize it in the local

Information

  • OS: AWS EC2 RHEL
  • IAM ROLE for S3
  • Same AWS Region
  • deltalake==0.17.4
  • pyarrow==16.0.0
  • getdaft==0.2.24
  • Python 3.12.2

Additional context
The Python Delta Lake library can read the data properly.
df.explain(True), df.collect(), df.to_pandas() gives error but it works with df.limit(1).to_pandas()
Error Log with RUST_BACKTRACE=full

thread 'python' panicked at src/daft-stats/src/partition_spec.rs:45:76:
called `Option::unwrap()` on a `None` value
stack backtrace:
   0:     0x7f23f9809238 - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h49651f1624eaa20e
   1:     0x7f23f7f42fcb - core::fmt::write::hd8c86404c00a946d
   2:     0x7f23f97d2d8e - std::io::Write::write_fmt::hdc635509a53e4168
   3:     0x7f23f980abb9 - std::sys_common::backtrace::print::he40889ab52a8091b
   4:     0x7f23f980a4d9 - std::panicking::default_hook::{{closure}}::h5de145ca1ef3bb0f
   5:     0x7f23f980b5c6 - std::panicking::rust_panic_with_hook::h66b5e9b0d1f9e688
   6:     0x7f23f980af0c - std::panicking::begin_panic_handler::{{closure}}::h66f180c1c92e0484
   7:     0x7f23f980ae99 - std::sys_common::backtrace::__rust_end_short_backtrace::h07ecf66ca8b75fae
   8:     0x7f23f980ae86 - rust_begin_unwind
   9:     0x7f23f752ded5 - core::panicking::panic_fmt::h2a05afbda6f351f3
  10:     0x7f23f752dfb0 - core::panicking::panic::h57a751f983276759
  11:     0x7f23f752e308 - core::option::unwrap_failed::h96cadc01302443bf
  12:     0x7f23f92a4f15 - <daft_stats::partition_spec::PartitionSpec as core::cmp::PartialEq>::eq::hfdd898d564fa9483
  13:     0x7f23f9249172 - <daft_scan::scan_task_iters::MergeByFileSize as core::iter::traits::iterator::Iterator>::next::heb9f3e3ed674b517
  14:     0x7f23f91179ca - core::iter::adapters::try_process::hbb82326581116e24
  15:     0x7f23f910f8e1 - daft_plan::physical_planner::translate::translate_single_logical_node::h039e18c5dda6be1a
  16:     0x7f23f9176ffe - common_treenode::TreeNode::visit::ha1b8b1861af47a06
  17:     0x7f23f9176c27 - daft_plan::builder::_::<impl daft_plan::builder::PyLogicalPlanBuilder>::__pymethod_to_physical_plan_scheduler__::h059a8d4b6e45d116
  18:     0x7f23f8943634 - pyo3::impl_::trampoline::trampoline::h09583cc9ffdc4c04
  19:     0x7f23f9175831 - daft_plan::builder::_::_::__INVENTORY::trampoline::ha8524b5520a94cce
  20:           0x556bfa - method_vectorcall_VARARGS_KEYWORDS
                               at /usr/local/src/conda/python-3.12.2/Objects/descrobject.c:365:14
  21:           0x546cf1 - _PyObject_VectorcallTstate
                               at /usr/local/src/conda/python-3.12.2/Include/internal/pycore_call.h:92:11
  22:           0x546cf1 - PyObject_Vectorcall
                               at /usr/local/src/conda/python-3.12.2/Objects/call.c:325:12
  23:           0x52d15c - _PyEval_EvalFrameDefault
                               at /croot/python-split_1709054613063/work/build-static/Python/bytecodes.c:2706:19
  24:           0x57fbfc - _PyEval_EvalFrame
                               at /usr/local/src/conda/python-3.12.2/Include/internal/pycore_ceval.h:89:16
  25:           0x57fbfc - gen_send_ex2
                               at /usr/local/src/conda/python-3.12.2/Objects/genobject.c:230:14
  26:           0x57fbfc - gen_iternext
                               at /usr/local/src/conda/python-3.12.2/Objects/genobject.c:603:9
  27:           0x57d274 - list_extend
                               at /usr/local/src/conda/python-3.12.2/Objects/listobject.c:944:26
  28:           0x5aa4e3 - list___init___impl
                               at /usr/local/src/conda/python-3.12.2/Objects/listobject.c:2792
  29:           0x5aa4e3 - list_vectorcall
                               at /usr/local/src/conda/python-3.12.2/Objects/listobject.c:2817
  30:           0x546cf1 - _PyObject_VectorcallTstate
                               at /usr/local/src/conda/python-3.12.2/Include/internal/pycore_call.h:92:11
  31:           0x546cf1 - PyObject_Vectorcall
                               at /usr/local/src/conda/python-3.12.2/Objects/call.c:325:12
  32:           0x52d15c - _PyEval_EvalFrameDefault
                               at /croot/python-split_1709054613063/work/build-static/Python/bytecodes.c:2706:19
  33:           0x5f8a2e - PyEval_EvalCode
                               at /usr/local/src/conda/python-3.12.2/Python/ceval.c:578:21
  34:           0x61d897 - run_eval_code_obj
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:1722
  35:           0x6191a7 - run_mod
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:1743
  36:           0x5037bc - PyRun_InteractiveOneObjectEx
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:260
  37:           0x504151 - _PyRun_InteractiveLoopObject
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:137
  38:           0x46e057 - _PyRun_AnyFileObject
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:72
  39:           0x5042b0 - PyRun_AnyFileExFlags
                               at /usr/local/src/conda/python-3.12.2/Python/pythonrun.c:104
  40:           0x4682ca - pymain_run_stdin
                               at /usr/local/src/conda/python-3.12.2/Modules/main.c:520
  41:           0x4682ca - pymain_run_python
                               at /usr/local/src/conda/python-3.12.2/Modules/main.c:632
  42:           0x4682ca - Py_RunMain
                               at /usr/local/src/conda/python-3.12.2/Modules/main.c:709
  43:           0x5e4099 - Py_BytesMain
                               at /usr/local/src/conda/python-3.12.2/Modules/main.c:763:12
  44:     0x7f240823feb0 - __libc_start_call_main
  45:     0x7f240823ff60 - __libc_start_main_impl
  46:           0x5e3ece - <unknown>
  47:                0x0 - <unknown>
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/analytics.py", line 189, in tracked_method
    result = method(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1590, in to_pandas
    self.collect()
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/analytics.py", line 189, in tracked_method
    result = method(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1466, in collect
    self._materialize_results()
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1448, in _materialize_results
    self._result_cache = context.runner().run(self._builder)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 135, in run
    results = list(self.run_iter(builder))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 180, in run_iter
    plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/logical/builder.py", line 44, in to_physical_plan_scheduler
    return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler(daft_execution_config))
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value
@jaychia
Copy link
Contributor

jaychia commented May 22, 2024

Thanks @Sanjay-M! We're taking a look at this now :)

@jaychia
Copy link
Contributor

jaychia commented May 22, 2024

Could you also supply the plan that is printed with df.explain(True)?

@Sanjay-M
Copy link
Author

@jaychia It can build the logical plan but throws an error while building the physical plan.
I need the IT team to bring up the servers, it will take another 11 hours to post the error log.

@jaychia
Copy link
Contributor

jaychia commented May 22, 2024

Got it, thanks! Other information that would be helpful for debugging:

  • Is your Delta table partitioned?
  • How is it partitioned (what columns and types are the partition keys?)

It would also be super helpful if you could share the output of your_delta_table.get_add_actions() using the Python Delta Lake library. I'm particularly interested in the data in there under the columns: partition_values, min and max!

@Sanjay-M
Copy link
Author

Sanjay-M commented May 23, 2024

Yes, the delta table is partitioned.

partition_values: struct<sub_tbl: string, system: string, device_type: string, manufacturer: string> not null
  child 0, sub_tbl: string
  child 1, system: string
  child 2, device_type: string
  child 3, manufacturer: string
min: struct<sub_tbl: null, system: null, device_type: null, manufacturer: null, dt: timestamp[us, tz=UTC], ts: timestamp[us, tz=UTC]> not null
max: struct<sub_tbl: null, system: null, device_type: null, manufacturer: null, dt: timestamp[us, tz=UTC], ts: timestamp[us, tz=UTC]> not null

I thought the error could be due to partition column values being null so I tried to replace them with NA

dfd = df.where(df["sub_tbl"] == "abc").select("sub_tbl", "system")
dfd.explain(True)

== Physical Plan ==
* Project: col(sub_tbl), col(system)
|   Clustering spec = { Num partitions = 17 }
|
* TabularScan:
|   Num Scan Tasks = 17
|   Estimated Scan Bytes = 27959464
|   Clustering spec = { Num partitions = 17 }

When I tried to to_pandas() after replacing NULL values, I got the below error

ScanWithTask-Project [Stage:1]:   0%|                                                                                                      | 0/1 [00:00<?, ?it/s]Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/analytics.py", line 189, in tracked_method
    result = method(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1590, in to_pandas
    self.collect()
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/analytics.py", line 189, in tracked_method
    result = method(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1466, in collect
    self._materialize_results()
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/dataframe/dataframe.py", line 1448, in _materialize_results
    self._result_cache = context.runner().run(self._builder)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 135, in run
    results = list(self.run_iter(builder))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 187, in run_iter
    yield from results_gen
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 279, in _physical_plan_to_partitions
    materialized_results = done_future.result()
                           ^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/gbp-ml/miniconda3/lib/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/runners/pyrunner.py", line 325, in build_partitions
    partitions = instruction.run(partitions)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/execution/execution_step.py", line 438, in run
    return self._project(inputs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/execution/execution_step.py", line 442, in _project
    return [input.eval_expression_list(self.projection)]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/gbp-ml/miniconda3/lib/python3.12/site-packages/daft/table/micropartition.py", line 169, in eval_expression_list
    return MicroPartition._from_pymicropartition(self._micropartition.eval_expression_list(pyexprs))
                                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
daft.exceptions.DaftCoreException: DaftError::External Parquet file: s3://s3-bucket/data/abc/tbl/zstd/sub_tbl=new/system=ABC/device_type=X/manufacturer=Y/part-00207-78ede249-e608-4e42-a545-72e91bg75166.c000.zstd.parquet metadata listed 1700 rows but only read: 0 

@colin-ho
Copy link
Contributor

colin-ho commented Jun 6, 2024

Hey @Sanjay-M ! Just merged a fix for this, it should be ready in the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants