Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37857: [Python][Dataset] Expose file size to python dataset #37868

Merged
merged 13 commits into from
Dec 5, 2023

Conversation

eeroel
Copy link
Contributor

@eeroel eeroel commented Sep 26, 2023

Rationale for this change

Allow passing known file sizes to make_fragment, to avoid potential network requests.

What changes are included in this PR?

Are these changes tested?

Yes, tests with S3 that file size gets used.

Are there any user-facing changes?

Yes, new function arguments.

@github-actions
Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

Would you mind change GS to GH?

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

So this size is expect to avoid a HEAD or LIST request in Object Store? Seems that this might be a bit dangerous if the size doesn't match

@eeroel eeroel changed the title GS-37857: [Python][Dataset] Expose file size to python dataset GH-37857: [Python][Dataset] Expose file size to python dataset Sep 26, 2023
@github-actions
Copy link

⚠️ GitHub issue #37857 has been automatically assigned in GitHub to PR creator.

@eeroel
Copy link
Contributor Author

eeroel commented Sep 26, 2023

So this size is expect to avoid a HEAD or LIST request in Object Store? Seems that this might be a bit dangerous if the size doesn't match

Yes, that's the idea. In Delta Lake and I believe in Iceberg as well, the objects are not changed after they are written, so the size that those formats store in their metadata should be correct always.

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

Thats a good idea, but I guess when we create InputStream in Object Store, the internal of stream will also issuing an HEAD request ( https://github.com/apache/arrow/blob/main/cpp/src/arrow/filesystem/s3fs.cc#L1232 ), I wonder if these are all duplicates...

@eeroel
Copy link
Contributor Author

eeroel commented Sep 26, 2023

Thats a good idea, but I guess when we create InputStream in Object Store, the internal of stream will also issuing an HEAD request ( https://github.com/apache/arrow/blob/main/cpp/src/arrow/filesystem/s3fs.cc#L1232 ), I wonder if these are all duplicates...

When the source has file size it will be passed here: https://github.com/apache/arrow/blob/ebc23687cb0376e24a0f002fe710db5ad891c674/cpp/src/arrow/filesystem/s3fs.cc#L2546, so the function will early exit (

return Status::OK();
)

But that request you linked is also problematic with regards to threading. I wonder if it would be difficult to refactor it to use SubmitIO ?

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

Can we use this part of interface: 4f8504f ?

By the way, -1 is a bit misleading here...

@eeroel
Copy link
Contributor Author

eeroel commented Sep 26, 2023

Can we use this part of interface: 4f8504f ?

Which part do you mean? In the current PR FileSource is constructed with FileInfos and FileInfos get constructed with size: https://github.com/apache/arrow/pull/37868/files#diff-82a2d9a8424dfb436344c0802dc36275cd0fb87e2e9eb6399745fba43799f7e5R63

By the way, -1 is a bit misleading here...

Yes, agreed it's not nice. I couldn't get the extension to compile when defaulting to size=None, got some errors about Python object "cannot be passed as a varargs parameter". I'm not familiar with Cython so I don't really know what that's about...

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

Ok, I mean can we use the one with CustomOpen, but this change also lgtm. You can firstly fix the ci, I'll ask the committers for review.

@eeroel eeroel force-pushed the feat/dataset_file_sizes branch 3 times, most recently from 9dedbb9 to 89bb95a Compare September 26, 2023 06:37
@eeroel
Copy link
Contributor Author

eeroel commented Sep 26, 2023

Ok, I mean can we use the one with CustomOpen, but this change also lgtm. You can firstly fix the ci, I'll ask the committers for review.

Hmm, I wonder why this breaks 🤔 It works on my machine... https://ci.appveyor.com/project/ApacheSoftwareFoundation/arrow/builds/48125445#L3684

@eeroel eeroel force-pushed the feat/dataset_file_sizes branch 3 times, most recently from f961559 to 24791e0 Compare September 26, 2023 09:50
@eeroel
Copy link
Contributor Author

eeroel commented Sep 26, 2023

Noticed that the code was always passing the size to FileSource even if it was the default -1... Surprisingly tests passed but it probably worked due to some internal validation in FileSource/FileInfo. Fixed now. But if someone has an idea of how to better handle the optional int value, I'm happy to change.

@westonpace
Copy link
Member

But that request you linked is also problematic with regards to threading

Can you expand on this?

By the way, -1 is a bit misleading here...

There is a kNoSize constant in filesystem.h that could be used in a few places but I don't know how much clearer that makes things. We are on C++17 now so we can use https://en.cppreference.com/w/cpp/utility/optional but that would be a considerably more significant refactor. I think -1 / kNoSize is probably ok for now.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get CI passing and then this change seems ok to me. @jorisvandenbossche any thoughts on the python API changes (though they are pretty minor, just adding a new optional parameter to make_fragment)?

Might be interesting to @wjones127 as well.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Sep 27, 2023
@eeroel
Copy link
Contributor Author

eeroel commented Sep 27, 2023

But that request you linked is also problematic with regards to threading

Can you expand on this?

I was running some experiments looking at the debug logs, and it seems that these HEAD requests always get executed on the main (?) thread. And from the code it also seems that way, it's not Async. So when a dataset consists of multiple fragments the file reads start effectively in sequence, and the latency from each HEAD adds up.

EDIT: I believe it's originating from here

ARROW_ASSIGN_OR_RAISE(auto input, source.Open());

@github-actions github-actions bot added awaiting review Awaiting review awaiting changes Awaiting changes awaiting committer review Awaiting committer review and removed awaiting review Awaiting review awaiting changes Awaiting changes labels Nov 27, 2023
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Nov 28, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Nov 28, 2023
@github-actions github-actions bot added awaiting merge Awaiting merge and removed awaiting change review Awaiting change review labels Nov 28, 2023
@mapleFU
Copy link
Member

mapleFU commented Nov 30, 2023

Ping @pitrou

@mapleFU mapleFU requested a review from pitrou December 1, 2023 09:47
@pitrou pitrou merged commit 2ea7f79 into apache:main Dec 5, 2023
12 of 14 checks passed
@pitrou pitrou removed the awaiting merge Awaiting merge label Dec 5, 2023
@pitrou
Copy link
Member

pitrou commented Dec 5, 2023

Thank you for contributing @eeroel !

@eeroel
Copy link
Contributor Author

eeroel commented Dec 5, 2023

Thanks everyone for the help, and @mapleFU in particular for pushing this forward!

Copy link

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 2ea7f79.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 3 possible false positives for unstable benchmarks that are known to sometimes produce them.

@ldacey
Copy link

ldacey commented Jan 28, 2024

Should this fail with bad file_size values? I tried changing the file_size to -10000 and it still succeed for me. I am not sure if GCS uses this information though (I am using fsspec for the filesystem which uses the gcsfs library). I think I am using it wrong because I cannot get it to fail in general, regardless of whether the size I input is the real size of the file.

I didn't see a fragment.size attribute or anything to check other than fragment.metadata.serialized_size which is different

fragment = file_format.make_fragment(path, filesystem=dataset.filesystem, partition_expression=expression, file_size=-10000)
print(fragment)

<pyarrow.dataset.ParquetFileFragment path=bucket/discard/year=2023/part-0.parquet partition=[year=2023]>
<pyarrow.dataset.ParquetFileFragment path=bucket/discard/year=2024/part-0.parquet partition=[year=2024]>

# fresh dataset written with version 15.0
<pyarrow._parquet.FileMetaData object at 0x7f725816ecf0>
  created_by: parquet-cpp-arrow version 15.0.0
  num_columns: 56
  num_rows: 21420
  num_row_groups: 5
  format_version: 2.6
  serialized_size: 34164

@eeroel
Copy link
Contributor Author

eeroel commented Jan 28, 2024

Should this fail with bad file_size values? I tried changing the file_size to -10000 and it still succeed for me. I am not sure if GCS uses this information though (I am using fsspec for the filesystem which uses the gcsfs library). I think I am using it wrong because I cannot get it to fail in general, regardless of whether the size I input is the real size of the file.

Did you also try to create a dataset with those fragments and read it? There's no validation when the fragments are constructed, but it should fail when the parquet reader tries to start reading the file, in here:

int64_t GetFooterReadSize() {

It would make sense to handle zero and negative sizes on the Python side though...

Regarding fsspec, the file size information will only get used for Arrow internal file system implementations, and I believe currently it's only used for S3.

@mapleFU
Copy link
Member

mapleFU commented Jan 28, 2024

Should this fail with bad file_size values? I tried changing the file_size to -10000 and it still succeed for me.

I guess if fileSize is invalid, parsing a parquet footer might failed when size is bad. In other format the behavior might be undefined.

@ldacey
Copy link

ldacey commented Jan 28, 2024

Here are some more details:

ic(pa.__version__)
ic(fs)
dataset = ds.dataset(gcs_path, filesystem=fs)
ic(dataset.filesystem)

table = dataset.to_table()
ic(table.num_rows)

file_format = ds.ParquetFileFormat()
paths = dataset.files

original_fragments = [frag for frag in dataset.get_fragments()]
original_dataset = ds.FileSystemDataset(
    original_fragments, format=file_format, schema=table.schema, filesystem=dataset.filesystem
)
assert dataset.to_table().equals(original_dataset.to_table())
ic(dataset.to_table().equals(original_dataset.to_table()))

wrong_fragments = [] 
for path in dataset.files:
    fake_size = 55555555555
    actual_size = dataset.filesystem.get_file_info(path).size
    ic(actual_size, fake_size)
    fragment = file_format.make_fragment(path, filesystem=dataset.filesystem, file_size=fake_size)
    wrong_fragments.append(fragment)
    
test = ds.FileSystemDataset(
    wrong_fragments, format=file_format, schema=table.schema, filesystem=dataset.filesystem
)

assert dataset.to_table().equals(test.to_table())
ic(dataset.to_table().equals(test.to_table()))

___

ic| pa.__version__: '15.0.0'
ic| fs: <gcsfs.core.GCSFileSystem object at 0x7f13ea37dd90>
ic| dataset.filesystem: <pyarrow._fs.PyFileSystem object at 0x7f13b5b2d130>
ic| table.num_rows: 23491
ic| dataset.to_table().equals(original_dataset.to_table()): True
ic| actual_size: 4237841, fake_size: 55555555555
ic| dataset.to_table().equals(test.to_table()): True

I was using dataset.filesystem which is a pyarrow filesystem (generated from the gcsfs filesystem). In that example, I made the file size some random large number and you can see it does not match the real size yet the table constructs and still matches the original table. I thought that would fail, unless I am doing something wrong. I have pyarrow 15 installed though.

@eeroel
Copy link
Contributor Author

eeroel commented Jan 28, 2024

Here are some more details:

ic(pa.__version__)
ic(fs)
dataset = ds.dataset(gcs_path, filesystem=fs)
ic(dataset.filesystem)

table = dataset.to_table()
ic(table.num_rows)

file_format = ds.ParquetFileFormat()
paths = dataset.files

original_fragments = [frag for frag in dataset.get_fragments()]
original_dataset = ds.FileSystemDataset(
    original_fragments, format=file_format, schema=table.schema, filesystem=dataset.filesystem
)
assert dataset.to_table().equals(original_dataset.to_table())
ic(dataset.to_table().equals(original_dataset.to_table()))

wrong_fragments = [] 
for path in dataset.files:
    fake_size = 55555555555
    actual_size = dataset.filesystem.get_file_info(path).size
    ic(actual_size, fake_size)
    fragment = file_format.make_fragment(path, filesystem=dataset.filesystem, file_size=fake_size)
    wrong_fragments.append(fragment)
    
test = ds.FileSystemDataset(
    wrong_fragments, format=file_format, schema=table.schema, filesystem=dataset.filesystem
)

assert dataset.to_table().equals(test.to_table())
ic(dataset.to_table().equals(test.to_table()))

___

ic| pa.__version__: '15.0.0'
ic| fs: <gcsfs.core.GCSFileSystem object at 0x7f13ea37dd90>
ic| dataset.filesystem: <pyarrow._fs.PyFileSystem object at 0x7f13b5b2d130>
ic| table.num_rows: 23491
ic| dataset.to_table().equals(original_dataset.to_table()): True
ic| actual_size: 4237841, fake_size: 55555555555
ic| dataset.to_table().equals(test.to_table()): True

I was using dataset.filesystem which is a pyarrow filesystem (generated from the gcsfs filesystem). In that example, I made the file size some random large number and you can see it does not match the real size yet the table constructs and still matches the original table. I thought that would fail, unless I am doing something wrong. I have pyarrow 15 installed though.

Do you also get this result if you set fake_size to a negative number? With a too-large size this is expected, because the size is not actually used in this case (it's only used with Arrow S3fs implementation at the moment).

@ldacey
Copy link

ldacey commented Jan 28, 2024

Yes, I changed fake_size to -9999 and reran it and it still worked. But since I am not using S3 (only have access to GCS and ADLSgen2) perhaps it is just ignored entirely.

ic| pa.__version__: '15.0.0'
ic| fs: <gcsfs.core.GCSFileSystem object at 0x7f55e8b88a10>
ic| dataset.filesystem: <pyarrow._fs.PyFileSystem object at 0x7f562f099230>
ic| table.num_rows: 23491
ic| dataset.to_table().equals(original_dataset.to_table()): True
ic| actual_size: 4237841, fake_size: -9999
ic| dataset.to_table().equals(test.to_table()): True

My original plan was to take a look at deltalake (delta-rs library) which already uses make_fragments(). Since the transaction log (get_add_actions()) has the actual file sizes then we could pass these to make_fragments() for some potential efficiency correct?

        if not filesystem or pyarrow.__version__ >= "15.0":
            file_sizes = self.get_add_actions().to_pydict()
            file_sizes = {
                x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
            }

        format = ParquetFileFormat(
            read_options=parquet_read_options,
            default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
        )

        fragments = []
        for file, part_expression in self._table.dataset_partitions(
            self.schema().to_pyarrow(), partitions
        ):
            if pyarrow.__version__ >= "15.0":
                fragment = format.make_fragment(
                    file,
                    filesystem=filesystem,
                    partition_expression=part_expression,
                    file_size=file_sizes[file],
                )
            else:
                fragment = format.make_fragment(
                    file,
                    filesystem=filesystem,
                    partition_expression=part_expression,
                )
            fragments.append(fragment)

@eeroel
Copy link
Contributor Author

eeroel commented Jan 28, 2024

Yes, I changed fake_size to -9999 and reran it and it still worked. But since I am not using S3 (only have access to GCS and ADLSgen2) perhaps it is just ignored entirely.

OK, thanks for confirming. I think it's good to check this, I didn't add a test case for negative values for this PR. Could be a bug, or it's possible there's some validation somewhere along the chain that silently ignores the value.

My original plan was to take a look at deltalake (delta-rs library) which already uses make_fragments(). Since the transaction log (get_add_actions()) has the actual file sizes then we could pass these to make_fragments() for some potential efficiency correct?

Yep, that was the motivation for this PR! I actually implemented this in deltalake for the filesystem implementation, so if you can use that then the optimization should already be applied: https://github.com/delta-io/delta-rs/blob/1b6c830aae4553d2a079a2bf42b024863fcbbb40/python/deltalake/table.py#L1035

To use this with the Arrow GCS implementation, I think the OpenFile method should be updated to check the size from FileInfo.

dgreiss pushed a commit to dgreiss/arrow that referenced this pull request Feb 19, 2024
…pache#37868)

### Rationale for this change

Allow passing known file sizes to `make_fragment`, to avoid potential network requests.

### What changes are included in this PR?

### Are these changes tested?

Yes, tests with S3 that file size gets used.

### Are there any user-facing changes?

Yes, new function arguments.

* Closes: apache#37857

Lead-authored-by: Eero Lihavainen <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Eero Lihavainen <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Python] Allow passing file sizes to FileSystemDataset from Python
8 participants