Skip to content

Commit

Permalink
FEAT-modin-project#2451: Document partition packing logic.
Browse files Browse the repository at this point in the history
Signed-off-by: William Ma <[email protected]>
  • Loading branch information
williamma12 committed Feb 3, 2021
1 parent 0cc367e commit bebe6f2
Showing 1 changed file with 24 additions and 32 deletions.
56 changes: 24 additions & 32 deletions modin/engines/base/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ def _read(cls, filepath_or_buffer, **kwargs):
is_quoting=is_quoting,
)

print("num_partitions: {}".format(num_partitions))
print("SPLITS:\n{}".format(len(splits)))
for chunks in splits:
print("CHUNKS:\n{}".format(chunks))
args.update({"chunks": chunks})
partition_id = cls.deploy(cls.parse, num_splits + 2, args)
partition_ids.append(partition_id[:-2])
Expand Down Expand Up @@ -353,10 +350,10 @@ def partitioned_multiple_files(
)

final_result = []
split_result = []
split_size = 0
print("PARTITION_SIZE: {}".format(partition_size))
partial_partition = []
partial_partition_size = 0
for f, fname, fsize in zip(files, fnames, file_sizes):
# We skip the headers of every file before trying to read from them.
if skip_header:
cls._read_rows(
f,
Expand All @@ -366,11 +363,10 @@ def partitioned_multiple_files(
)

# Fill up the rest of the partition before partitioning the rest of the file.
if split_size > 0:
if partial_partition_size > 0:
start = f.tell()
if nrows:
print("NROWS: {}".format(nrows))
remainder_size = min(partition_size, nrows) - split_size
remainder_size = min(partition_size, nrows) - partial_partition_size
_, read_size = cls._read_rows(
f,
nrows=remainder_size,
Expand All @@ -379,7 +375,7 @@ def partitioned_multiple_files(
)
end = f.tell()
else:
remainder_size = partition_size - split_size
remainder_size = partition_size - partial_partition_size
cls.offset(
f,
offset_size=remainder_size,
Expand All @@ -389,27 +385,26 @@ def partitioned_multiple_files(
end = f.tell()
read_size = end - start

print("REMAINDER SIZE: {}".format(remainder_size))
print("SPLIT SIZE: {}".format(read_size))
split_result.append((fname, start, end))
split_size += read_size
partial_partition.append((fname, start, end))
partial_partition_size += read_size
if read_size < remainder_size:
# The file that we were reading was too small to fill the carried over partiton (partial_partition).
continue
else:
if nrows:
nrows -= split_size
print("+++")
final_result.append(split_result)
split_result = []
split_size = 0
nrows -= partial_partition_size
final_result.append(partial_partition)
partial_partition = []
partial_partition_size = 0

if nrows == 0:
# We stop reading here having completed, if necessary, the partial partition.
break

if f.tell() == fsize:
# Don't bother reading an empty file.
continue

DEBUG_START = f.tell()
file_splits, rows_read = cls.partitioned_file(
f,
fname,
Expand All @@ -419,13 +414,11 @@ def partitioned_multiple_files(
quotechar=quotechar,
is_quoting=is_quoting,
)
DEBUG_END = f.tell()
# print("FILE READ: {}".format(rows_read))
print("FILE READ: {}".format(DEBUG_END - DEBUG_START))
print("---")

if skiprows:
# Update bookkeeping on skipped rows.
if skiprows > rows_read:
# Wanted to skip more rows than what was available in the file.
skiprows -= rows_read
continue
else:
Expand All @@ -442,20 +435,19 @@ def partitioned_multiple_files(
full_last_partition = last_size >= partition_size

if full_last_partition:
print("+++")
final_result.append(file_splits)
else:
# Don't append anything if the file was too small for one partition.
if len(file_splits) > 1:
print("+++")
# Don't append anything if the file was too small for one partition.
final_result.append(file_splits[:-1])
split_result = [file_splits[-1]]
split_size = last_size
partial_partition = [file_splits[-1]]
partial_partition_size = last_size
if nrows:
nrows += split_size
# We add the carried over partition because we need it to calculate the how much to read for the partial partition.
nrows += partial_partition_size

# Add straggler splits into the final result.
if split_size > 0:
final_result.append(split_result)
if partial_partition_size > 0:
final_result.append(partial_partition)

return final_result

0 comments on commit bebe6f2

Please sign in to comment.