Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Very slow read from a super massive table in Azure Data Lake Gen2 #1569

Closed
WaterKnight1998 opened this issue Jul 28, 2023 · 13 comments
Closed
Labels
bug Something isn't working

Comments

@WaterKnight1998
Copy link

First of all thanks for creating this awesome library and eliminating the need to use spark for everything related to deltalake.

I have been trying to read a table from Azure Data Lake gen 2 and it is super slow.

The table hast lots of versions but when instantiating DeltaTable I specify the latest one

dt=DeltaTable("adl://container1/table1/", version=33, storage_options=storage_options)

This above code is almost fast 6s, but when I run to_pyarrow_table and to_pyarrow_dataset to use the data later in DuckDB it is taking super long. Is it possible to increase the number of threads it uses for reading data?

Any other possible solution?

Thanks in advance!

@WaterKnight1998 WaterKnight1998 added the enhancement New feature or request label Jul 28, 2023
@roeap
Copy link
Collaborator

roeap commented Aug 18, 2023

We have some ideas to improve read performance using our internal file system implementaions (which are in use here).

In case you have some predicates that can be applied to you data, you may be able to avoid reading some of the data into memory, by using to_pyarrow_dataset and subsequently the scanner(...) methods. I "think" DuckDB can also do predicate pushdown for pyarrow types, but I am not sure about it. In case that does not happen, or happen only partially, applying some predicated to the scanner should give you some performance boost.

@Yacobolo
Copy link

We have also experienced really slow reads with ADLS gen 2 and a 70 mb file taking 12 minutes.

@mjducut-oe
Copy link

mjducut-oe commented Aug 22, 2023

Hi there!

We found a faster approach for this one if your goal is to only read the current data of the delta table.
Steps are:

  1. Create a DeltaTable instance.
dt = DeltaTable("adl://container1/table1/", storage_options=storage_options)
  1. After this, the goal is to grab the path of the current parquet files used by the latest version of the delta table:
files = dt.file_uris()
  1. Create a pyarrow dataset using these files and read into a pyarrow table. You also need to pass the fs used:
import pyarrow.parquet as pq
import pyarrow as pa

pq_data = pq.ParquetDataset(path_or_paths= files, filesystem= fs)
table= pq_data.read()

Special thanks to @Yacobolo for this one!

@ion-elgreco
Copy link
Collaborator

@mjducutoae If you want to keep the delta schema, you will need to pass it also:

pq_data = pq.ParquetDataset(path_or_paths=files, schema=dt.schema().to_pyarrow(), filesystem=fs)

@wjones127
Copy link
Collaborator

I think this is likely due to some GIL-locking in the filesystem implementation.

If you pass a PyArrow filesystem down instead, does it perform as expected?

dt = DeltaTable("adl://container1/table1/")
dt.to_pyarrow_dataset(filesystem=fs)

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Sep 5, 2023

I think this is likely due to some GIL-locking in the filesystem implementation.

If you pass a PyArrow filesystem down instead, does it perform as expected?

dt = DeltaTable("adl://container1/table1/")
dt.to_pyarrow_dataset(filesystem=fs)

I don't see any difference, same execution time as with or without fs.

fs_abfs = fsspec.filesystem("abfs", **storage_options)
handler = pafs.FSSpecHandler(fs_abfs)
fs = pafs.PyFileSystem(handler)
sub_fs = pafs.SubTreeFileSystem(os.path.join(root_dir, file), fs)
df = dt.to_pyarrow_dataset(filesystem=sub_fs)

@wjones127
Copy link
Collaborator

@ion-elgreco Okay. Did you get faster performance using PyArrow datasets directly on the parquet files in Azure? #1569 (comment)

I suspect the fsspec implementation also holds the GIL, given it's implemented in Python. So comparing against that might not be relevant. But if you get better performance by reading the parquet files with PyArrow directly, then that would point to something else.

@ion-elgreco
Copy link
Collaborator

@ion-elgreco Okay. Did you get faster performance using PyArrow datasets directly on the parquet files in Azure? #1569 (comment)

I suspect the fsspec implementation also holds the GIL, given it's implemented in Python. So comparing against that might not be relevant. But if you get better performance by reading the parquet files with PyArrow directly, then that would point to something else.

Yes, reading the files directly with pyarrow dataset is the fastest.

wjones127 added a commit that referenced this issue Sep 6, 2023
# Description

I'm not sure yet this is the root cause of the performance issues we are
seeing in #1569, but this seems like an obvious change that might help.

Any method that takes `&mut self` implicitly holds the GIL, so we'll
need to allow threads explicitly in those methods.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
@ion-elgreco
Copy link
Collaborator

@wjones127 You can disregard my previous results, I had a network issue which slowed down the reads. I retried it today and passing a pyarrow filesystem which made it just as fast as using pyarrow.dataset.dataset directly.

def read_delta_fast_lazy(
    file: str,
    storage_options: dict | None = None,
    version: int | None = None,
) -> pl.LazyFrame:
    if storage_options is None:
        fs = fsspec.filesystem("file")
    else:
        fs = fsspec.filesystem("abfs", **storage_options)
    dt = DeltaTable(file, version=version, storage_options=storage_options)
    files = dt.file_uris()

    pq_data = pa.dataset.dataset(
        source=files,
        schema=dt.schema().to_pyarrow(),
        format='parquet',
        filesystem=fs,
    )
    return pl.scan_pyarrow_dataset(pq_data)

This is also just as fast:

dt = DeltaTable(os.path.join(root_dir, file), storage_options=storage_options)
fs_abfs = fsspec.filesystem("abfs", **storage_options)
handler = pafs.FSSpecHandler(fs_abfs)
fs = pafs.PyFileSystem(handler)
sub_fs = pafs.SubTreeFileSystem(os.path.join(root_dir, file), fs)
df = dt.to_pyarrow_dataset(filesystem=sub_fs)

@wjones127
Copy link
Collaborator

Thanks for confirming @ion-elgreco . I’ll look into this more soon

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Sep 20, 2023

@wjones127 I've some done some performance checks with v0.10.1 and v0.10.2, and with my own function using pyarrow directly. Seems that it's working as expected now : )

def read_delta_fast_lazy(
    file: str,
    storage_options: dict | None = None,
    version: int | None = None,
) -> pl.LazyFrame:
    """Fast path to read delta tables lazily, until upstream fixed: https://github.com/delta-io/delta-rs/issues/1569

    Args:
        file (str): file url (abfs://...)
        storage_options (dict): storage_options for fsspec, must contain, client_id, client_secret, tenant_id
        version (int | None, optional): Version number of table. Defaults to None.

    Returns:
        pl.LazyFrame: dataframe
    """
    if storage_options is None:
        fs = fsspec.filesystem("file")
    else:
        storage_options['account_name'] = file.split("@")[1].split('.')[0]
        fs = fsspec.filesystem("abfs", **storage_options)
    dt = DeltaTable(file, version=version, storage_options=storage_options)
    files = dt.file_uris()

    pq_data = pa.dataset.dataset(
        source=files,
        schema=dt.schema().to_pyarrow(),
        partitioning="hive",
        format="parquet",
        filesystem=fs,
    )
    return pl.scan_pyarrow_dataset(pq_data)

Results (pyarrow dataset):

%%timeit
df = read_delta_fast(os.path.join(root_dir, "table"), storage_options=storage_options)
31.6 s ± 1.72 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Results (v0.10.1):

%%timeit
df2 = pl.read_delta(os.path.join(root_dir, "data"), storage_options=storage_options)
49.3 s ± 1.5 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Results (v0.10.2):

%%timeit
df2 = pl.read_delta(os.path.join(root_dir, "data"), storage_options=storage_options)
32 s ± 1.84 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

@rtyler rtyler added bug Something isn't working and removed enhancement New feature or request labels Sep 20, 2023
@ion-elgreco
Copy link
Collaborator

@rtyler @wjones127 apache/arrow#36765 enabling pre-buffer in the pyarrow parquet scan options boost the performance quite a lot.

@ion-elgreco
Copy link
Collaborator

@wjones127 @rtyler shall we close this issue now that pre-buffer is enabled in pyarrow?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants