Skip to content

Commit

Permalink
FEAT-modin-project#2451: Passing tests
Browse files Browse the repository at this point in the history
Signed-off-by: William Ma <[email protected]>
  • Loading branch information
williamma12 committed Feb 3, 2021
1 parent 7406629 commit 780fb59
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 42 deletions.
4 changes: 3 additions & 1 deletion modin/engines/base/io/file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ def get_file_path(fs_handle) -> List[str]:
else:
return []
else:
return fs_handle.glob(s3_path)
s3_paths = fs_handle.glob(s3_path)
s3_addresses = ["{}{}".format("s3://", path) for path in s3_paths]
return s3_addresses

s3fs = S3FS.S3FileSystem(anon=False)
try:
Expand Down
52 changes: 39 additions & 13 deletions modin/engines/base/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ def _read(cls, filepath_or_buffer, **kwargs):
is_quoting=is_quoting,
)

print("SPLITS:\n{}".format(splits))
print("num_partitions: {}".format(num_partitions))
print("SPLITS:\n{}".format(len(splits)))
for chunks in splits:
print("CHUNKS:\n{}".format(chunks))
args.update({"chunks": chunks})
Expand Down Expand Up @@ -346,17 +347,18 @@ def partitioned_multiple_files(
if num_partitions is None:
num_partitions = NPartitions.get()

file_size = sum(cls.file_size(f) for f in files)
file_sizes = [cls.file_size(f) for f in files]
partition_size = max(
1, num_partitions, (nrows if nrows else file_size) // num_partitions
1, num_partitions, (nrows if nrows else sum(file_sizes)) // num_partitions
)

final_result = []
split_result = []
split_size = 0
for f, fname in zip(files, fnames):
print("PARTITION_SIZE: {}".format(partition_size))
for f, fname, fsize in zip(files, fnames, file_sizes):
if skip_header:
outside_quotes, read_rows = cls._read_rows(
cls._read_rows(
f,
nrows=skip_header,
quotechar=quotechar,
Expand All @@ -365,35 +367,49 @@ def partitioned_multiple_files(

# Fill up the rest of the partition before partitioning the rest of the file.
if split_size > 0:
remainder_size = partition_size - split_size
start = f.tell()
if nrows:
_, read_rows = cls._read_rows(
print("NROWS: {}".format(nrows))
remainder_size = min(partition_size, nrows) - split_size
_, read_size = cls._read_rows(
f,
nrows=remainder_size,
quotechar=quotechar,
is_quoting=is_quoting,
)
split_size += read_rows
nrows -= read_rows
end = f.tell()
else:
remainder_size = partition_size - split_size
cls.offset(
f,
offset_size=remainder_size,
quotechar=quotechar,
is_quoting=is_quoting,
)
end = f.tell()
split_size += end - start
read_size = end - start

print("REMAINDER SIZE: {}".format(remainder_size))
print("SPLIT SIZE: {}".format(read_size))
split_result.append((fname, start, end))
if split_size < partition_size:
split_size += read_size
if read_size < remainder_size:
continue
else:
if nrows:
nrows -= split_size
print("+++")
final_result.append(split_result)
split_result = []
split_size = 0

if nrows == 0:
break

if f.tell() == fsize:
continue

DEBUG_START = f.tell()
file_splits, rows_read = cls.partitioned_file(
f,
fname,
Expand All @@ -403,8 +419,12 @@ def partitioned_multiple_files(
quotechar=quotechar,
is_quoting=is_quoting,
)
DEBUG_END = f.tell()
# print("FILE READ: {}".format(rows_read))
print("FILE READ: {}".format(DEBUG_END - DEBUG_START))
print("---")

if rows_read > 0 and skiprows:
if skiprows:
if skiprows > rows_read:
skiprows -= rows_read
continue
Expand All @@ -422,11 +442,17 @@ def partitioned_multiple_files(
full_last_partition = last_size >= partition_size

if full_last_partition:
print("+++")
final_result.append(file_splits)
else:
final_result.append(file_splits[:-1])
# Don't append anything if the file was too small for one partition.
if len(file_splits) > 1:
print("+++")
final_result.append(file_splits[:-1])
split_result = [file_splits[-1]]
split_size = last_size
if nrows:
nrows += split_size

# Add straggler splits into the final result.
if split_size > 0:
Expand Down
88 changes: 60 additions & 28 deletions modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
df_equals,
eval_io,
make_sql_connection,
make_csv_file,
_make_csv_file,
teardown_test_files,
)
from modin.pandas.test.utils import get_unique_filename

Expand Down Expand Up @@ -68,33 +69,64 @@ def test_from_sql_defaults(make_sql_connection): # noqa: F811
df_equals(modin_df_from_table, pandas_df)


@pytest.fixture(scope="class")
def TestReadGlobCSVFixture():
filenames = []

base_name = get_unique_filename(extension="")
pytest.glob_path = "{}_*.csv".format(base_name)
pytest.files = ["{}_{}.csv".format(base_name, i) for i in range(11)]
for fname in pytest.files:
_make_csv_file(filenames)(fname, row_size=11, remove_randomness=True)

yield

teardown_test_files(filenames)

@pytest.mark.usefixtures("TestReadGlobCSVFixture")
@pytest.mark.skipif(
Engine.get() != "Ray", reason="Currently only support Ray engine for glob paths."
)
def test_read_multiple_csv(make_csv_file): # noqa: F811
base_name = get_unique_filename(extension="")
glob_path = "{}_*.csv".format(base_name)
files = ["{}_{}.csv".format(base_name, i) for i in range(2)]
for fname in files:
make_csv_file(fname)

pandas_df1 = pandas.concat([pandas.read_csv(fname) for fname in files])
pandas_df2 = pandas.concat([pandas.read_csv(fname) for fname in files[::-1]])
# We have to reset the index because concating mucks with the indices.
pandas_df1 = pandas_df1.reset_index(drop=True)
pandas_df2 = pandas_df2.reset_index(drop=True)
modin_df = pd.read_csv(glob_path)

# Glob does not guarantee ordering so we have to test both.
try:
df_equals(modin_df, pandas_df1)
except AssertionError:
df_equals(modin_df, pandas_df2)


def test_read_csv_s3(self):
eval_io(
fn_name="read_csv",
# read_csv kwargs
filepath_or_buffer="s3://noaa-ghcn-pds/csv/178*.csv",
)
class TestCsvGlob:
def test_read_multiple_small_csv(self): # noqa: F811
pandas_df = pandas.concat([pandas.read_csv(fname) for fname in pytest.files])
modin_df = pd.read_csv(pytest.glob_path)

# Indexes get messed up when concatting so we reset both.
pandas_df = pandas_df.reset_index(drop=True)
modin_df = modin_df.reset_index(drop=True)

# Glob does not guarantee ordering so we have to test both.
df_equals(modin_df, pandas_df)


@pytest.mark.parametrize("nrows", [35, 100])
def test_read_multiple_csv_nrows(self, request, nrows): # noqa: F811
pandas_df = pandas.concat([pandas.read_csv(fname) for fname in pytest.files])
pandas_df = pandas_df.iloc[:nrows, :]

modin_df = pd.read_csv(pytest.glob_path, nrows=nrows)

# Indexes get messed up when concatting so we reset both.
pandas_df = pandas_df.reset_index(drop=True)
modin_df = modin_df.reset_index(drop=True)

# Glob does not guarantee ordering so we have to test both.
df_equals(modin_df, pandas_df)


@pytest.mark.skipif(
Engine.get() != "Ray", reason="Currently only support Ray engine for glob paths."
)
def test_read_multiple_csv_s3():
modin_df = pd.read_csv("S3://noaa-ghcn-pds/csv/178*.csv")

# We have to specify the columns because the column names are not identical. Since we specified the column names, we also have to skip the original column names.
pandas_dfs = [pandas.read_csv("s3://noaa-ghcn-pds/csv/178{}.csv".format(i), names=modin_df.columns, skiprows=[0]) for i in range(10)]
pandas_df = pd.concat(pandas_dfs)

# Indexes get messed up when concatting so we reset both.
pandas_df = pandas_df.reset_index(drop=True)
modin_df = modin_df.reset_index(drop=True)

df_equals(modin_df, pandas_df)
3 changes: 3 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def _csv_file_maker(
encoding=None,
compression="infer",
additional_col_values=None,
remove_randomness=False,
add_blank_lines=False,
add_bad_lines=False,
add_nan_lines=False,
Expand Down Expand Up @@ -206,6 +207,8 @@ def _csv_file_maker(
}
)
df = pandas.DataFrame(data)
if remove_randomness:
df = df[["col1", "col2", "col3", "col4"]]
if add_nan_lines:
for i in range(0, row_size, row_size // (row_size // 10)):
df.loc[i] = pandas.Series()
Expand Down

0 comments on commit 780fb59

Please sign in to comment.