Skip to content

Commit

Permalink
fix copy_files() (#4472)
Browse files Browse the repository at this point in the history
  • Loading branch information
brimoor authored Jun 10, 2024
1 parent 656a264 commit 2e49b92
Showing 1 changed file with 37 additions and 37 deletions.
74 changes: 37 additions & 37 deletions fiftyone/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def open_files(paths, mode="r", skip_failures=False, progress=None):
a list of open file-like objects
"""
tasks = [(p, mode, skip_failures) for p in paths]
return _run(_do_open_file, tasks, progress=progress)
return _run(_do_open_file, tasks, return_results=True, progress=progress)


def read_file(path, binary=False):
Expand Down Expand Up @@ -239,7 +239,7 @@ def read_files(paths, binary=False, skip_failures=False, progress=None):
a list of file contents
"""
tasks = [(p, binary, skip_failures) for p in paths]
return _run(_do_read_file, tasks, progress=progress)
return _run(_do_read_file, tasks, return_results=True, progress=progress)


def write_file(str_or_bytes, path):
Expand Down Expand Up @@ -793,7 +793,7 @@ def move_files(inpaths, outpaths, skip_failures=False, progress=None):
progress callback function to invoke instead
"""
tasks = [(i, o, skip_failures) for i, o in zip(inpaths, outpaths)]
_run(_do_move_file, tasks, progress=progress)
_run(_do_move_file, tasks, return_results=False, progress=progress)


def move_dir(
Expand Down Expand Up @@ -848,7 +848,7 @@ def delete_files(paths, skip_failures=False, progress=None):
progress callback function to invoke instead
"""
tasks = [(p, skip_failures) for p in paths]
_run(_do_delete_file, tasks, progress=progress)
_run(_do_delete_file, tasks, return_results=False, progress=progress)


def delete_dir(dirpath):
Expand All @@ -861,67 +861,65 @@ def delete_dir(dirpath):
etau.delete_dir(dirpath)


def run(fcn, tasks, num_workers=None, progress=None):
def run(fcn, tasks, return_results=True, num_workers=None, progress=None):
"""Applies the given function to each element of the given tasks.
Args:
fcn: a function that accepts a single argument
tasks: an iterable of function arguments
return_results (True): whether to return the function results
num_workers (None): a suggested number of threads to use
progress (None): whether to render a progress bar (True/False), use the
default value ``fiftyone.config.show_progress_bars`` (None), or a
progress callback function to invoke instead
Returns:
the list of function outputs
"""
num_workers = fou.recommend_thread_pool_workers(num_workers)

try:
num_tasks = len(tasks)
except:
num_tasks = None

kwargs = dict(total=num_tasks, iters_str="files", progress=progress)

if num_workers <= 1:
with fou.ProgressBar(**kwargs) as pb:
results = [fcn(task) for task in pb(tasks)]
else:
with multiprocessing.dummy.Pool(processes=num_workers) as pool:
with fou.ProgressBar(**kwargs) as pb:
results = list(pb(pool.imap(fcn, tasks)))

return results
the list of function outputs, or None if ``return_results == False``
"""
return _run(
fcn,
tasks,
return_results=return_results,
num_workers=num_workers,
progress=progress,
)


def _copy_files(inpaths, outpaths, skip_failures, progress):
tasks = [(i, o, skip_failures) for i, o in zip(inpaths, outpaths)]
_run(_do_copy_file, tasks, progress=progress)
_run(_do_copy_file, tasks, return_results=False, progress=progress)


def _run(fcn, tasks, return_results=True, num_workers=None, progress=None):
try:
num_tasks = len(tasks)
except:
num_tasks = None

def _run(fcn, tasks, num_workers=None, progress=None):
num_tasks = len(tasks)
if num_tasks == 0:
return []
return [] if return_results else None

num_workers = fou.recommend_thread_pool_workers(num_workers)

kwargs = dict(total=num_tasks, iters_str="files", progress=progress)

results = []
if num_workers <= 1:
with fou.ProgressBar(**kwargs) as pb:
for task in pb(tasks):
result = fcn(task)
results.append(result)
if return_results:
results = [fcn(task) for task in pb(tasks)]
else:
for task in pb(tasks):
fcn(task)
else:
with multiprocessing.dummy.Pool(processes=num_workers) as pool:
with fou.ProgressBar(**kwargs) as pb:
for result in pb(pool.imap_unordered(fcn, tasks)):
results.append(result)
if return_results:
results = list(pb(pool.imap(fcn, tasks)))
else:
for _ in pb(pool.imap_unordered(fcn, tasks)):
pass

return results
if return_results:
return results


def _do_copy_file(arg):
Expand Down Expand Up @@ -1003,6 +1001,8 @@ def _copy_file(inpath, outpath, cleanup=False):
etau.ensure_basedir(outpath)
if cleanup:
shutil.move(inpath, outpath)
else:
shutil.copy(inpath, outpath)


def _delete_file(filepath):
Expand Down

0 comments on commit 2e49b92

Please sign in to comment.