Skip to content

Commit

Permalink
remote: use string paths over PathInfo for performance reasons (#3672)
Browse files Browse the repository at this point in the history
* remote: use checksums instead of paths when filling dir statuses

* remote: prefer using str paths over PathInfo for performance reasons

* only use string paths for RemoteLOCAL

* only optimize calls made from RemoteLOCAL.cache_exists

* fix ds warning

* use os.path in local

* remote: re-add checksum_to_path() to return string paths when applicable

- cloud remotes still default to using PathInfo's

* cache fspath string

* use abspath in checksum_to_path

- if path is not relpath from cwd or abspath, posix lstat() syscall
  runtime doubles (from calculating relpath from cwd)
  • Loading branch information
pmrowla authored Apr 28, 2020
1 parent 6d698ea commit 6d8499e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 41 deletions.
10 changes: 7 additions & 3 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def is_dir_checksum(cls, checksum):
return checksum.endswith(cls.CHECKSUM_DIR_SUFFIX)

def get_checksum(self, path_info):
assert path_info.scheme == self.scheme
assert isinstance(path_info, str) or path_info.scheme == self.scheme

if not self.exists(path_info):
return None
Expand Down Expand Up @@ -719,6 +719,10 @@ def path_to_checksum(self, path):
def checksum_to_path_info(self, checksum):
return self.path_info / checksum[0:2] / checksum[2:]

# Return path as a string instead of PathInfo for remotes which support
# string paths (see local)
checksum_to_path = checksum_to_path_info

def list_cache_paths(self, prefix=None, progress_callback=None):
raise NotImplementedError

Expand Down Expand Up @@ -796,8 +800,8 @@ def changed_cache_file(self, checksum):
- Remove the file from cache if it doesn't match the actual checksum
"""

cache_info = self.checksum_to_path_info(checksum)
# Prefer string path over PathInfo when possible due to performance
cache_info = self.checksum_to_path(checksum)
if self.is_protected(cache_info):
logger.debug(
"Assuming '%s' is unchanged since it is read-only", cache_info
Expand Down
79 changes: 44 additions & 35 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from concurrent.futures import as_completed, ThreadPoolExecutor
from functools import partial

from funcy import concat
from funcy import cached_property, concat

from shortuuid import uuid

Expand Down Expand Up @@ -67,6 +67,13 @@ def cache_dir(self, value):
def supported(cls, config):
return True

@cached_property
def cache_path(self):
return os.path.abspath(self.cache_dir)

def checksum_to_path(self, checksum):
return os.path.join(self.cache_path, checksum[0:2], checksum[2:])

def list_cache_paths(self, prefix=None, progress_callback=None):
assert self.path_info is not None
if prefix:
Expand All @@ -88,7 +95,7 @@ def get(self, md5):

def exists(self, path_info):
assert is_working_tree(self.repo.tree)
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
return self.repo.tree.exists(fspath_py35(path_info))

def makedirs(self, path_info):
Expand Down Expand Up @@ -148,11 +155,15 @@ def get_file_checksum(self, path_info):
return file_md5(path_info)[0]

def remove(self, path_info):
if path_info.scheme != "local":
raise NotImplementedError
if isinstance(path_info, PathInfo):
if path_info.scheme != "local":
raise NotImplementedError
path = path_info.fspath
else:
path = path_info

if self.exists(path_info):
remove(path_info.fspath)
if self.exists(path):
remove(path)

def move(self, from_info, to_info, mode=None):
if from_info.scheme != "local" or to_info.scheme != "local":
Expand Down Expand Up @@ -285,11 +296,11 @@ def _status(
show_checksums=False,
download=False,
):
"""Return a tuple of (dir_status_info, file_status_info, dir_mapping).
"""Return a tuple of (dir_status_info, file_status_info, dir_contents).
dir_status_info contains status for .dir files, file_status_info
contains status for all other files, and dir_mapping is a dict of
{dir_path_info: set(file_path_info...)} which can be used to map
contains status for all other files, and dir_contents is a dict of
{dir_checksum: set(file_checksum, ...)} which can be used to map
a .dir file to its file contents.
"""
logger.debug(
Expand Down Expand Up @@ -324,30 +335,27 @@ def _status(
)
)
return self._make_status(
named_cache, remote, show_checksums, local_exists, remote_exists
named_cache, show_checksums, local_exists, remote_exists
)

def _make_status(
self, named_cache, remote, show_checksums, local_exists, remote_exists
self, named_cache, show_checksums, local_exists, remote_exists
):
def make_names(checksum, names):
return {"name": checksum if show_checksums else " ".join(names)}

dir_status = {}
file_status = {}
dir_paths = {}
dir_contents = {}
for checksum, item in named_cache[self.scheme].items():
if item.children:
dir_status[checksum] = make_names(checksum, item.names)
file_status.update(
{
child_checksum: make_names(child_checksum, child.names)
for child_checksum, child in item.children.items()
}
)
dir_paths[remote.checksum_to_path_info(checksum)] = frozenset(
map(remote.checksum_to_path_info, item.child_keys())
)
dir_contents[checksum] = set()
for child_checksum, child in item.children.items():
file_status[child_checksum] = make_names(
child_checksum, child.names
)
dir_contents[checksum].add(child_checksum)
else:
file_status[checksum] = make_names(checksum, item.names)

Expand All @@ -356,7 +364,7 @@ def make_names(checksum, names):

self._log_missing_caches(dict(dir_status, **file_status))

return dir_status, file_status, dir_paths
return dir_status, file_status, dir_contents

def _indexed_dir_checksums(self, named_cache, remote, dir_md5s):
# Validate our index by verifying all indexed .dir checksums
Expand Down Expand Up @@ -409,13 +417,15 @@ def _get_plans(self, download, remote, status_info, status):
cache = []
path_infos = []
names = []
checksums = []
for md5, info in Tqdm(
status_info.items(), desc="Analysing status", unit="file"
):
if info["status"] == status:
cache.append(self.checksum_to_path_info(md5))
path_infos.append(remote.checksum_to_path_info(md5))
names.append(info["name"])
checksums.append(md5)

if download:
to_infos = cache
Expand All @@ -424,7 +434,7 @@ def _get_plans(self, download, remote, status_info, status):
to_infos = path_infos
from_infos = cache

return from_infos, to_infos, names
return from_infos, to_infos, names, checksums

def _process(
self,
Expand Down Expand Up @@ -457,7 +467,7 @@ def _process(
if jobs is None:
jobs = remote.JOBS

dir_status, file_status, dir_paths = self._status(
dir_status, file_status, dir_contents = self._status(
named_cache,
remote,
jobs=jobs,
Expand All @@ -482,18 +492,20 @@ def _process(
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
for from_info, to_info, name, checksum in zip(*file_plans):
file_futures[checksum] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
for from_info, to_info, name, dir_checksum in zip(
*dir_plans
):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
for file_checksum, future in file_futures.items()
if file_checksum in dir_contents[dir_checksum]
}
dir_futures[to_info] = executor.submit(
dir_futures[dir_checksum] = executor.submit(
self._dir_upload,
func,
wait_futures,
Expand All @@ -516,12 +528,9 @@ def _process(

if not download:
# index successfully pushed dirs
for to_info, future in dir_futures.items():
for dir_checksum, future in dir_futures.items():
if future.result() == 0:
dir_checksum = remote.path_to_checksum(str(to_info))
file_checksums = list(
named_cache.child_keys(self.scheme, dir_checksum)
)
file_checksums = dir_contents[dir_checksum]
logger.debug(
"Indexing pushed dir '{}' with "
"'{}' nested files".format(
Expand Down
6 changes: 3 additions & 3 deletions dvc/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def save(self, path_info, checksum):
path_info (dict): path_info to save checksum for.
checksum (str): checksum to save.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
assert checksum is not None
assert os.path.exists(fspath_py35(path_info))

Expand Down Expand Up @@ -398,7 +398,7 @@ def get(self, path_info):
str or None: checksum for the specified path info or None if it
doesn't exist in the state database.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"
path = fspath_py35(path_info)

if not os.path.exists(path):
Expand All @@ -425,7 +425,7 @@ def save_link(self, path_info):
Args:
path_info (dict): path info to add to the list of links.
"""
assert path_info.scheme == "local"
assert isinstance(path_info, str) or path_info.scheme == "local"

if not os.path.exists(fspath_py35(path_info)):
return
Expand Down

0 comments on commit 6d8499e

Please sign in to comment.