Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] read_csv not filter out files by default #29032

Merged
merged 5 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions python/ray/data/datasource/csv_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CSVDatasource(FileBasedDatasource):
def _read_stream(
self, f: "pyarrow.NativeFile", path: str, **reader_args
) -> Iterator[Block]:
import pyarrow
import pyarrow as pa
from pyarrow import csv

read_options = reader_args.pop(
Expand All @@ -40,19 +40,27 @@ def _read_stream(
if hasattr(parse_options, "invalid_row_handler"):
parse_options.invalid_row_handler = parse_options.invalid_row_handler

reader = csv.open_csv(
f, read_options=read_options, parse_options=parse_options, **reader_args
)
schema = None
while True:
try:
batch = reader.read_next_batch()
table = pyarrow.Table.from_batches([batch], schema=schema)
if schema is None:
schema = table.schema
yield table
except StopIteration:
return
try:
reader = csv.open_csv(
f, read_options=read_options, parse_options=parse_options, **reader_args
)
schema = None
while True:
try:
batch = reader.read_next_batch()
table = pa.Table.from_batches([batch], schema=schema)
if schema is None:
schema = table.schema
yield table
except StopIteration:
return
except pa.lib.ArrowInvalid as e:
raise ValueError(
f"Failed to read CSV file: {path}. "
"Please check the CSV file has correct format, or filter out non-CSV "
"file with 'partition_filter' field. See read_csv() documentation for "
"more details."
) from e

def _write_block(
self,
Expand Down
24 changes: 15 additions & 9 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,7 @@ def read_csv(
ray_remote_args: Dict[str, Any] = None,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: Optional[
PathPartitionFilter
] = CSVDatasource.file_extension_filter(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the same logic applicable for other data types (e.g. json, parquet)?

Copy link
Contributor Author

@c21 c21 Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the same logic applicable for other data types (e.g. json, parquet)?

We don't have default filtering for Parquet. So Parquet is good.
We have default filter for JSON to filter out files without .json extension. Just tried out Arrow and Spark on my laptop and they won't filter out files when reading JSON. We can change the behavior for JSON files as well later in another followup PR. To me, CSV fix is more urgent as there're multiple user reports (but it can also be the case that our read_json is not popular).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, read_parquet doesn't have default filtering while read_parquet_bulk does (which seems a bit unintuitive to me, since read_parquet_bulk doesn't support directories).

In that case I think it's reasonable to go forward with this for now and follow-up with a more holistic/consistent solution.

One question - is the error message when reading a non-CSV file directly actionable to the user, especially for users who previously relied on this default behavior? E.g. when the file is a TSV, or if the file is some random file that should be excluded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, read_parquet doesn't have default filtering while read_parquet_bulk does (which seems a bit unintuitive to me, since read_parquet_bulk doesn't support directories).

Yeah, this looks unintuitive to me too. Don't think we should have different between these two Parquet APIs.

One question - is the error message when reading a non-CSV file directly actionable to the user, especially for users who previously relied on this default behavior? E.g. when the file is a TSV, or if the file is some random file that should be excluded.

Yeah agreed, we should have an actionable error message. That's exactly I am doing now, plan to have another PR to tackle the error message when reading non-CSV files by mistake, and it should also apply when reading a malformed .csv file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added handling to provide more detailed error message. Added an example error message when reading non-CSV file in PR description.

partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Partitioning = Partitioning("hive"),
**arrow_csv_args,
) -> Dataset[ArrowRow]:
Expand All @@ -597,15 +595,13 @@ def read_csv(
>>> ray.data.read_csv( # doctest: +SKIP
... ["s3://bucket/path1", "s3://bucket/path2"])

>>> # Read files that use a different delimiter. The partition_filter=None is needed here
>>> # because by default read_csv only reads .csv files. For more uses of ParseOptions see
>>> # Read files that use a different delimiter. For more uses of ParseOptions see
>>> # https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html # noqa: #501
>>> from pyarrow import csv
>>> parse_options = csv.ParseOptions(delimiter="\t")
>>> ray.data.read_csv( # doctest: +SKIP
... "example://iris.tsv",
... parse_options=parse_options,
... partition_filter=None)
... parse_options=parse_options)

>>> # Convert a date column with a custom format from a CSV file.
>>> # For more uses of ConvertOptions see
Expand All @@ -626,6 +622,15 @@ def read_csv(
>>> ds.take(1) # doctest: + SKIP
[{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}

By default, ``read_csv`` reads all files from file paths. If you want to filter
files by file extensions, set the ``partition_filter`` parameter.

>>> # Read only *.csv files from multiple directories.
>>> from ray.data.datasource import FileExtensionFilter
>>> ray.data.read_csv( # doctest: +SKIP
... ["s3://bucket/path1", "s3://bucket/path2"],
... partition_filter=FileExtensionFilter("csv"))

Args:
paths: A single file/directory path or a list of file/directory paths.
A list of paths can contain both files and directories.
Expand All @@ -639,8 +644,9 @@ def read_csv(
be able to resolve file metadata more quickly and/or accurately.
partition_filter: Path-based partition filter, if any. Can be used
with a custom callback to read only selected partitions of a dataset.
By default, this filters out any file paths whose file extension does not
match "*.csv*".
By default, this does not filter out any files.
If wishing to filter out all file paths except those whose file extension
matches e.g. "*.csv*", a ``FileExtensionFilter("csv")`` can be provided.
partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
that describes how paths are organized. By default, this function parses
`Hive-style partitions <https://athena.guide/articles/hive-style-partitioning/>`_.
Expand Down
60 changes: 53 additions & 7 deletions python/ray/data/tests/test_dataset_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
PathPartitionEncoder,
PathPartitionFilter,
)
from ray.data.datasource.file_based_datasource import _unwrap_protocol
from ray.data.datasource.file_based_datasource import (
FileExtensionFilter,
_unwrap_protocol,
)


def df_to_csv(dataframe, path, **kwargs):
Expand Down Expand Up @@ -196,7 +199,12 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url):
storage_options=storage_options,
)

ds = ray.data.read_csv(path, filesystem=fs, partitioning=None)
ds = ray.data.read_csv(
path,
filesystem=fs,
partition_filter=FileExtensionFilter("csv"),
partitioning=None,
)
assert ds.num_blocks() == 2
df = pd.concat([df1, df2], ignore_index=True)
dsdf = ds.to_pandas()
Expand Down Expand Up @@ -642,7 +650,7 @@ def test_csv_read_with_column_type_specified(shutdown_only, tmp_path):

# Incorrect to parse scientific notation in int64 as PyArrow represents
# it as double.
with pytest.raises(pa.lib.ArrowInvalid):
with pytest.raises(ValueError):
ray.data.read_csv(
file_path,
convert_options=csv.ConvertOptions(
Expand All @@ -661,15 +669,53 @@ def test_csv_read_with_column_type_specified(shutdown_only, tmp_path):
assert ds.to_pandas().equals(expected_df)


def test_csv_read_filter_no_file(shutdown_only, tmp_path):
def test_csv_read_filter_non_csv_file(shutdown_only, tmp_path):
df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})

# CSV file with .csv extension.
path1 = os.path.join(tmp_path, "test2.csv")
df.to_csv(path1, index=False)

# CSV file without .csv extension.
path2 = os.path.join(tmp_path, "test3")
df.to_csv(path2, index=False)

# Directory of CSV files.
ds = ray.data.read_csv(tmp_path)
assert ds.to_pandas().equals(pd.concat([df, df], ignore_index=True))

# Non-CSV file in Parquet format.
table = pa.Table.from_pandas(df)
path = os.path.join(str(tmp_path), "test.parquet")
pq.write_table(table, path)
path3 = os.path.join(tmp_path, "test1.parquet")
pq.write_table(table, path3)

# Single non-CSV file.
error_message = "Failed to read CSV file"
with pytest.raises(ValueError, match=error_message):
ray.data.read_csv(path3)

# Single non-CSV file with filter.
error_message = "No input files found to read"
with pytest.raises(ValueError, match=error_message):
ray.data.read_csv(path)
ray.data.read_csv(path3, partition_filter=FileExtensionFilter("csv"))

# Single CSV file without extension.
ds = ray.data.read_csv(path2)
assert ds.to_pandas().equals(df)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a case where two CSV files, one with .csv and the other without, but we can successfully read both into dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianoaix - sure, added.

# Single CSV file without extension with filter.
error_message = "No input files found to read"
with pytest.raises(ValueError, match=error_message):
ray.data.read_csv(path2, partition_filter=FileExtensionFilter("csv"))

# Directory of CSV and non-CSV files.
error_message = "Failed to read CSV file"
with pytest.raises(ValueError, match=error_message):
ray.data.read_csv(tmp_path)

# Directory of CSV and non-CSV files with filter.
ds = ray.data.read_csv(tmp_path, partition_filter=FileExtensionFilter("csv"))
assert ds.to_pandas().equals(df)


@pytest.mark.skipif(
Expand Down