Skip to content

Commit

Permalink
Fix read_csv for Pandas 0.22 (#15)
Browse files Browse the repository at this point in the history
* Fix read_csv for Pandas 0.23

Update API to Pandas 0.22 and add fixes

More bugfixes

Add comment

Don't require pathlib installed

* Formatting

* rsplit takes no kwargs in py2

* Fix bug with file read ahead buffer

* Speed up test_from_csv_chunksize
  • Loading branch information
pschafhalter authored and simon-mo committed Jul 6, 2018
1 parent 1446df9 commit 3e8ea0d
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 85 deletions.
2 changes: 1 addition & 1 deletion modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __repr__(self):
return repr(self._repr_helper_())
# The split here is so that we don't repr pandas row lengths.
result = self._repr_helper_()
final_result = repr(result).rsplit("\n\n", maxsplit=1)[0] + \
final_result = repr(result).rsplit("\n\n", 1)[0] + \
"\n\n[{0} rows x {1} columns]".format(len(self.index),
len(self.columns))
return final_result
Expand Down
274 changes: 190 additions & 84 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from io import BytesIO
import os
import py
from pyarrow.parquet import ParquetFile
import pyarrow.parquet as pq
import re
Expand Down Expand Up @@ -63,51 +64,127 @@ def _read_parquet_column(path, column, kwargs={}):


# CSV
def _compute_offset(fn, npartitions, ignore_first_line=False):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
bio = open(fn, 'rb')
if ignore_first_line:
start = len(bio.readline())
chunksize = (total_bytes - start) // npartitions
else:
start = 0
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1

offsets = []
while start < total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
# The position of the \n we just crossed.
new_line_cursor = start + total_offset - 1
offsets.append((start, new_line_cursor))
start = new_line_cursor + 1

bio.close()
return offsets


def _get_firstline(file_path):
bio = open(file_path, 'rb')
first = bio.readline()
bio.close()
return first
def _skip_header(f, kwargs={}):
lines_read = 0
comment = kwargs["comment"]
skiprows = kwargs["skiprows"]
encoding = kwargs["encoding"]
header = kwargs["header"]
names = kwargs["names"]

if header is None:
return lines_read

if header == "infer":
if names is not None:
return lines_read
else:
header = 0

# Skip lines before the header
if isinstance(skiprows, int):
lines_read += skiprows
for _ in range(skiprows):
f.readline()
skiprows = None

header_lines = header + 1 if isinstance(header, int) else max(header) + 1

header_lines_skipped = 0
# Python 2 files use a read-ahead buffer which breaks our use of tell()
for line in iter(f.readline, ""):
lines_read += 1
skip = False
if not skip and comment is not None:
if encoding is not None:
skip |= line.decode(encoding)[0] == comment
else:
skip |= line.decode()[0] == comment
if not skip and callable(skiprows):
skip |= skiprows(lines_read)
elif not skip and hasattr(skiprows, "__contains__"):
skip |= lines_read in skiprows

if not skip:
header_lines_skipped += 1
if header_lines_skipped == header_lines:
return lines_read

return lines_read


def _read_csv_from_file(filepath, npartitions, kwargs={}):
"""Constructs a DataFrame from a CSV file.
Args:
filepath (str): path to the CSV file.
npartitions (int): number of partitions for the DataFrame.
kwargs (dict): args excluding filepath provided to read_csv.
def _infer_column(first_line, kwargs={}):
return pandas.read_csv(BytesIO(first_line), **kwargs).columns
Returns:
DataFrame or Series constructed from CSV file.
"""
empty_pd_df = pandas.read_csv(
filepath, **dict(kwargs, nrows=0, skipfooter=0, skip_footer=0))
names = empty_pd_df.columns

skipfooter = kwargs["skipfooter"]
skip_footer = kwargs["skip_footer"]

partition_kwargs = dict(
kwargs, header=None, names=names, skipfooter=0, skip_footer=0)
with open(filepath, "rb") as f:
# Get the BOM if necessary
prefix = b""
if kwargs["encoding"] is not None:
prefix = f.readline()
partition_kwargs["skiprows"] = 1
f.seek(0, os.SEEK_SET) # Return to beginning of file

prefix_id = ray.put(prefix)
partition_kwargs_id = ray.put(partition_kwargs)

# Skip the header since we already have the header information
_skip_header(f, kwargs)

# Launch tasks to read partitions
partition_ids = []
index_ids = []
total_bytes = os.path.getsize(filepath)
chunk_size = max(1, (total_bytes - f.tell()) // npartitions)
while f.tell() < total_bytes:
start = f.tell()
f.seek(chunk_size, os.SEEK_CUR)
f.readline() # Read a whole number of lines

if f.tell() >= total_bytes:
kwargs["skipfooter"] = skipfooter
kwargs["skip_footer"] = skip_footer

partition_id, index_id = _read_csv_with_offset._submit(
args=(filepath, start, f.tell(), partition_kwargs_id,
prefix_id),
num_return_vals=2)
partition_ids.append(partition_id)
index_ids.append(index_id)

# Construct index
index_id = get_index.remote([empty_pd_df.index.name], *index_ids) \
if kwargs["index_col"] is not None else None

df = DataFrame(row_partitions=partition_ids, columns=names, index=index_id)

skipfooter = kwargs["skipfooter"] or kwargs["skip_footer"]
if skipfooter:
df = df.drop(df.index[-skipfooter:])
if kwargs["squeeze"] and len(df.columns) == 1:
return df[df.columns[0]]

return df


@ray.remote
def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078

bio = open(fn, 'rb')
bio.seek(start)
to_read = header + bio.read(end - start)
Expand All @@ -119,9 +196,26 @@ def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
return pandas_df, index


def _read_csv_from_pandas(filepath_or_buffer, kwargs):
pd_obj = pandas.read_csv(filepath_or_buffer, **kwargs)

if isinstance(pd_obj, pandas.DataFrame):
return from_pandas(pd_obj, get_npartitions())
elif isinstance(pd_obj, pandas.io.parsers.TextFileReader):
# Overwriting the read method should return a ray DataFrame for calls
# to __next__ and get_chunk
pd_read = pd_obj.read
pd_obj.read = lambda *args, **kwargs: \
from_pandas(pd_read(*args, **kwargs), get_npartitions())

return pd_obj


@ray.remote
def get_index(*partition_indices):
return partition_indices[0].append(partition_indices[1:])
def get_index(index_name, *partition_indices):
index = partition_indices[0].append(partition_indices[1:])
index.names = index_name
return index


def read_csv(filepath_or_buffer,
Expand Down Expand Up @@ -179,7 +273,6 @@ def read_csv(filepath_or_buffer,
memory_map=False,
float_precision=None):
"""Read csv file from local disk.
Args:
filepath:
The filepath of the csv file.
Expand Down Expand Up @@ -243,53 +336,66 @@ def read_csv(filepath_or_buffer,
'float_precision': float_precision,
}

# Default to Pandas read_csv for non-serializable objects
if not isinstance(filepath_or_buffer, str) or \
_infer_compression(filepath_or_buffer, compression) is not None:
if isinstance(filepath_or_buffer, str):
if not os.path.exists(filepath_or_buffer):
warnings.warn(("File not found on disk. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)
elif not isinstance(filepath_or_buffer, py.path.local):
read_from_pandas = True

# Pandas read_csv supports pathlib.Path
try:
import pathlib
if isinstance(filepath_or_buffer, pathlib.Path):
read_from_pandas = False
except ImportError:
pass

if read_from_pandas:
warnings.warn(("Reading from buffer. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if _infer_compression(filepath_or_buffer, compression) is not None:
warnings.warn(("Compression detected. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

warnings.warn("Defaulting to Pandas implementation",
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if as_recarray:
warnings.warn("Defaulting to Pandas implementation.",
PendingDeprecationWarning)

pandas_obj = pandas.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pandas_obj, pandas.DataFrame):
return from_pandas(pandas_obj, get_npartitions())

return pandas_obj

filepath = filepath_or_buffer

# TODO: handle case where header is a list of lines
first_line = _get_firstline(filepath)
columns = _infer_column(first_line, kwargs=kwargs)
if header is None or (header == "infer" and names is not None):
first_line = b""
ignore_first_line = False
else:
ignore_first_line = True

offsets = _compute_offset(
filepath, get_npartitions(), ignore_first_line=ignore_first_line)

# Serialize objects to speed up later use in remote tasks
first_line_id = ray.put(first_line)
kwargs_id = ray.put(kwargs)

df_obj_ids = []
index_obj_ids = []
for start, end in offsets:
if start != 0:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id, first_line_id),
num_return_vals=2)
else:
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id), num_return_vals=2)
df_obj_ids.append(df)
index_obj_ids.append(index)
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if chunksize is not None:
warnings.warn(("Reading chunks from a file. "
"Defaulting to Pandas implementation."),
PendingDeprecationWarning)

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

if skiprows is not None and not isinstance(skiprows, int):
warnings.warn(("Defaulting to Pandas implementation. To speed up "
"read_csv through the Pandas on Ray implementation, "
"comment the rows to skip instead."))

return _read_csv_from_pandas(filepath_or_buffer, kwargs)

# TODO: replace this by reading lines from file.
if nrows is not None:
warnings.warn("Defaulting to Pandas implementation.",
PendingDeprecationWarning)

index = get_index.remote(*index_obj_ids) if index_col is not None else None
return _read_csv_from_pandas(filepath_or_buffer, kwargs)

return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index)
return _read_csv_from_file(filepath_or_buffer, get_npartitions(), kwargs)


def read_json(path_or_buf=None,
Expand Down
30 changes: 30 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,36 @@ def test_from_csv():
teardown_csv_file()


def test_from_csv_chunksize():
setup_csv_file(SMALL_ROW_SIZE)

# Tests __next__ and correctness of reader as an iterator
# Use larger chunksize to read through file quicker
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=500)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=500)

for ray_df, pd_df in zip(rdf_reader, pd_reader):
assert ray_df_equals_pandas(ray_df, pd_df)

# Tests that get_chunk works correctly
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)

ray_df = rdf_reader.get_chunk(1)
pd_df = pd_reader.get_chunk(1)

assert ray_df_equals_pandas(ray_df, pd_df)

# Tests that read works correctly
rdf_reader = pd.read_csv(TEST_CSV_FILENAME, chunksize=1)
pd_reader = pandas.read_csv(TEST_CSV_FILENAME, chunksize=1)

ray_df = rdf_reader.read()
pd_df = pd_reader.read()

assert ray_df_equals_pandas(ray_df, pd_df)


def test_from_json():
setup_json_file(SMALL_ROW_SIZE)

Expand Down

0 comments on commit 3e8ea0d

Please sign in to comment.