Skip to content

Commit

Permalink
Fix issue NVIDIA#43 (empty files creation) and improve reading/writin…
Browse files Browse the repository at this point in the history
…g speed (NVIDIA#57)

This commit fixes issue NVIDIA#43 (empty files created when invoking reshard_jsonl method at nemo_curator.utils.file_utils.py) by double-checking the files size after being generated, and deleting them with size zero.

In addition to that, I have noticed there is no need to parse to JSON object the content of the different lines, which should be already in json format. By removing that extra-parsing, there is a significant speed up in the execution of this method.

Signed-off-by: Miguel Martínez <[email protected]>
Signed-off-by: Vibhu Jawa <[email protected]>
  • Loading branch information
miguelusque authored and VibhuJawa committed May 16, 2024
1 parent b306856 commit 5387ccd
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions nemo_curator/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ def parse_str_of_num_bytes(s, return_str=False):
def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=None):
"""Worker function to write out the data to jsonl files"""

def _output_json(document):
myjson = json.dumps(document, ensure_ascii=False)
return myjson.encode("utf-8")
def _encode_text(document):
return document.strip().encode("utf-8")

def _name(start_index, npad, prefix, i):
tag = str(start_index + i).rjust(npad, "0")
Expand All @@ -195,11 +194,22 @@ def _name(start_index, npad, prefix, i):

output_glob_string = os.path.join(output_path, "*.jsonl")

documents.map(_output_json).to_textfiles(
output_files = documents.map(_encode_text).to_textfiles(
output_glob_string,
name_function=name,
)

# Delete empty files generated due to empty partitions in the bag
for output_file in output_files:
try:
if os.path.getsize(output_file) == 0:
os.remove(output_file)
except Exception as exception:
print(
f"An exception occurred when trying to delete {output_file}.\n{exception}",
flush=True,
)


def reshard_jsonl(
input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix=""
Expand All @@ -212,7 +222,8 @@ def reshard_jsonl(
output_dir: The output directory where the resharded jsonl files will be written
output_file_size: Approximate size of output files. Must specify with a string and
with the unit K, M or G for kilo, mega or gigabytes
start_index: Starting index for naming the output files
start_index: Starting index for naming the output files. Note: The indices may not
be continuous if the sharding process would output an empty file in its place
file_prefix: Prefix to use to prepend to output file number
"""

Expand All @@ -222,7 +233,7 @@ def reshard_jsonl(
input_files = list(get_all_files_paths_under(input_dir))

# Read in the dask bag
b = db.read_text(input_files, blocksize=blocksize).map(json.loads)
b = db.read_text(input_files, blocksize=blocksize)

# Prepare the output
output_dir = expand_outdir_and_mkdir(output_dir)
Expand Down

0 comments on commit 5387ccd

Please sign in to comment.