Skip to content

Commit

Permalink
FEAT-#2451: Read multiple csv files simultaneously via glob paths (#2662
Browse files Browse the repository at this point in the history
)

Signed-off-by: William Ma <[email protected]>
  • Loading branch information
williamma12 authored Feb 3, 2021
1 parent 9495ff7 commit 1b3e9d9
Show file tree
Hide file tree
Showing 10 changed files with 732 additions and 3 deletions.
46 changes: 46 additions & 0 deletions modin/backends/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,52 @@ def parse(fname, **kwargs):
]


class PandasCSVGlobParser(PandasCSVParser):
@staticmethod
def parse(chunks, **kwargs):
warnings.filterwarnings("ignore")
num_splits = kwargs.pop("num_splits", None)
index_col = kwargs.get("index_col", None)

pandas_dfs = []
for fname, start, end in chunks:
if start is not None and end is not None:
# pop "compression" from kwargs because bio is uncompressed
bio = FileDispatcher.file_open(
fname, "rb", kwargs.pop("compression", "infer")
)
if kwargs.get("encoding", None) is not None:
header = b"" + bio.readline()
else:
header = b""
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
pandas_dfs.append(pandas.read_csv(BytesIO(to_read), **kwargs))
else:
# This only happens when we are reading with only one worker (Default)
return pandas.read_csv(fname, **kwargs)

# Combine read in data.
if len(pandas_dfs) > 1:
pandas_df = pandas.concat(pandas_dfs)
elif len(pandas_dfs) > 0:
pandas_df = pandas_dfs[0]
else:
pandas_df = pandas.DataFrame()

# Set internal index.
if index_col is not None:
index = pandas_df.index
else:
# The lengths will become the RangeIndex
index = len(pandas_df)
return _split_result_for_readers(1, num_splits, pandas_df) + [
index,
pandas_df.dtypes,
]


class PandasFWFParser(PandasParser):
@staticmethod
def parse(fname, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions modin/data_management/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def read_parquet(cls, **kwargs):
def read_csv(cls, **kwargs):
return cls.__engine._read_csv(**kwargs)

@classmethod
def read_csv_glob(cls, **kwargs):
return cls.__engine._read_csv_glob(**kwargs)

@classmethod
def read_json(cls, **kwargs):
return cls.__engine._read_json(**kwargs)
Expand Down
4 changes: 4 additions & 0 deletions modin/data_management/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def prepare(cls):

cls.io_cls = ExperimentalPandasOnRayIO

@classmethod
def _read_csv_glob(cls, **kwargs):
return cls.io_cls.read_csv_glob(**kwargs)


class ExperimentalPandasOnPythonFactory(ExperimentalBaseFactory, PandasOnPythonFactory):
pass
Expand Down
2 changes: 2 additions & 0 deletions modin/engines/base/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from modin.engines.base.io.io import BaseIO
from modin.engines.base.io.text.csv_dispatcher import CSVDispatcher
from modin.engines.base.io.text.csv_glob_dispatcher import CSVGlobDispatcher
from modin.engines.base.io.text.fwf_dispatcher import FWFDispatcher
from modin.engines.base.io.text.json_dispatcher import JSONDispatcher
from modin.engines.base.io.text.excel_dispatcher import ExcelDispatcher
Expand All @@ -26,6 +27,7 @@
__all__ = [
"BaseIO",
"CSVDispatcher",
"CSVGlobDispatcher",
"FWFDispatcher",
"JSONDispatcher",
"FileDispatcher",
Expand Down
Loading

0 comments on commit 1b3e9d9

Please sign in to comment.