Skip to content

Commit

Permalink
remote: locally index list of checksums available on cloud remotes (#…
Browse files Browse the repository at this point in the history
…3634)

* remote: locally index list of checksums available on cloud remotes

* fix review issues

- use pytest fixtures
- use sha256 digest
- s/fd/fobj/
- check more specific IO errors

* remote: use .dir checksums to validate index

* command: add --drop-index option for push/pull/status

* tests: --drop-index unit tests

* remote: make index threadsafe

* remote: force re-index after gc

* only save/dump index once per command

* remote: add helper index functions update_all/replace_all

* remote: store index as sqlite3 database

* fix deepsource warnings

* rename --drop-index to --clear-index

* rename index.invalidate to index.clear

* functional tests for remote.index

* fix index not being updated on status -c

* remote: add index.intersection()

* remote: use index when making assumptions about remote .dir contents

* add missing func tests

* do not index standalone files

* cleanup index test fixture

* fix deepsource warnings

* update autocompletion scripts/cli help message for --clear-index

* tests: remove unused mocker

* push: only index successfully pushed dirs

- do not index files on partial push/upload

* remote: revert behavior to trust .dir files on remote

- skip unnecessary index check for dir contents if .dir file exists on
  the remote

* fix missing RemoteIndexNoop functions

* separate index validation from status dir exists query

* bugfix: include indexed checksums for all cache_exists cases

* review fix: cleanup debug messages

* tests: fix DS warning

* remove --clear-index

* remove unused index functions
  • Loading branch information
pmrowla authored Apr 21, 2020
1 parent 63c466d commit b04964f
Show file tree
Hide file tree
Showing 7 changed files with 507 additions and 32 deletions.
65 changes: 53 additions & 12 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import errno
import hashlib
import itertools
import json
import logging
import tempfile
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from functools import partial
from functools import partial, wraps
from multiprocessing import cpu_count
from operator import itemgetter

Expand All @@ -23,6 +24,7 @@
from dvc.ignore import DvcIgnore
from dvc.path_info import PathInfo, URLInfo, WindowsPathInfo
from dvc.progress import Tqdm
from dvc.remote.index import RemoteIndex, RemoteIndexNoop
from dvc.remote.slow_link_detection import slow_link_guard
from dvc.state import StateNoop
from dvc.utils import tmp_fname
Expand Down Expand Up @@ -70,11 +72,24 @@ def __init__(self, checksum):
)


def index_locked(f):
@wraps(f)
def wrapper(remote_obj, *args, **kwargs):
remote = kwargs.get("remote")
if remote:
with remote.index:
return f(remote_obj, *args, **kwargs)
return f(remote_obj, *args, **kwargs)

return wrapper


class RemoteBASE(object):
scheme = "base"
path_cls = URLInfo
REQUIRES = {}
JOBS = 4 * cpu_count()
INDEX_CLS = RemoteIndex

PARAM_RELPATH = "relpath"
CHECKSUM_DIR_SUFFIX = ".dir"
Expand Down Expand Up @@ -111,6 +126,15 @@ def __init__(self, repo, config):
self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES)
self.cache_type_confirmed = False

url = config.get("url")
if url:
index_name = hashlib.sha256(url.encode("utf-8")).hexdigest()
self.index = self.INDEX_CLS(
self.repo, index_name, dir_suffix=self.CHECKSUM_DIR_SUFFIX
)
else:
self.index = RemoteIndexNoop()

@classmethod
def get_missing_deps(cls):
import importlib
Expand Down Expand Up @@ -734,6 +758,7 @@ def all(self, jobs=None, name=None):
remote_size, remote_checksums, jobs, name
)

@index_locked
def gc(self, named_cache, jobs=None):
used = self.extract_used_local_checksums(named_cache)

Expand All @@ -754,6 +779,8 @@ def gc(self, named_cache, jobs=None):
self._remove_unpacked_dir(checksum)
self.remove(path_info)
removed = True
if removed:
self.index.clear()
return removed

def is_protected(self, path_info):
Expand Down Expand Up @@ -872,10 +899,18 @@ def cache_exists(self, checksums, jobs=None, name=None):
# cache_exists() (see ssh, local)
assert self.TRAVERSE_PREFIX_LEN >= 2

if len(checksums) == 1 or not self.CAN_TRAVERSE:
return self._cache_object_exists(checksums, jobs, name)
checksums = set(checksums)
indexed_checksums = set(self.index.intersection(checksums))
checksums -= indexed_checksums
logger.debug(
"Matched '{}' indexed checksums".format(len(indexed_checksums))
)
if not checksums:
return indexed_checksums

checksums = frozenset(checksums)
if len(checksums) == 1 or not self.CAN_TRAVERSE:
remote_checksums = self._cache_object_exists(checksums, jobs, name)
return list(indexed_checksums) + remote_checksums

# Max remote size allowed for us to use traverse method
remote_size, remote_checksums = self._estimate_cache_size(
Expand All @@ -898,19 +933,25 @@ def cache_exists(self, checksums, jobs=None, name=None):
len(checksums), traverse_weight
)
)
return list(
checksums & remote_checksums
) + self._cache_object_exists(
checksums - remote_checksums, jobs, name
return (
list(indexed_checksums)
+ list(checksums & remote_checksums)
+ self._cache_object_exists(
checksums - remote_checksums, jobs, name
)
)

logger.debug(
"Querying {} checksums via traverse".format(len(checksums))
"Querying '{}' checksums via traverse".format(len(checksums))
)
remote_checksums = self._cache_checksums_traverse(
remote_size, remote_checksums, jobs, name
remote_checksums = set(
self._cache_checksums_traverse(
remote_size, remote_checksums, jobs, name
)
)
return list(indexed_checksums) + list(
checksums & set(remote_checksums)
)
return list(checksums & set(remote_checksums))

def _checksums_with_limit(
self, limit, prefix=None, progress_callback=None
Expand Down
214 changes: 214 additions & 0 deletions dvc/remote/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import logging
import os
import sqlite3
import threading

from funcy import lchunks

from dvc.state import _connect_sqlite

logger = logging.getLogger(__name__)


class RemoteIndexNoop:
"""No-op class for remotes which are not indexed (i.e. local)."""

def __init__(self, *args, **kwargs):
pass

def __enter__(self):
pass

def __exit__(self, typ, value, tbck):
pass

def __iter__(self):
return iter([])

def __contains__(self, checksum):
return False

@staticmethod
def checksums():
return []

@staticmethod
def dir_checksums():
return []

def load(self):
pass

def dump(self):
pass

def clear(self):
pass

def update(self, *args):
pass

@staticmethod
def intersection(*args):
return []


class RemoteIndex:
"""Class for indexing remote checksums in a sqlite3 database.
Args:
repo: repo for this remote index.
name: name for this index. Index db will be loaded from and saved to
``.dvc/tmp/index/{name}.idx``.
dir_suffix: suffix used for naming directory checksums
"""

INDEX_SUFFIX = ".idx"
VERSION = 1
INDEX_TABLE = "remote_index"
INDEX_TABLE_LAYOUT = "checksum TEXT PRIMARY KEY, " "dir INTEGER NOT NULL"

def __init__(self, repo, name, dir_suffix=".dir"):
self.path = os.path.join(
repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX)
)

self.dir_suffix = dir_suffix
self.database = None
self.cursor = None
self.modified = False
self.lock = threading.Lock()

def __iter__(self):
cmd = "SELECT checksum FROM {}".format(self.INDEX_TABLE)
for (checksum,) in self._execute(cmd):
yield checksum

def __enter__(self):
self.lock.acquire()
self.load()

def __exit__(self, typ, value, tbck):
self.dump()
self.lock.release()

def __contains__(self, checksum):
cmd = "SELECT checksum FROM {} WHERE checksum = (?)".format(
self.INDEX_TABLE
)
self._execute(cmd, (checksum,))
return self.cursor.fetchone() is not None

def checksums(self):
"""Iterate over checksums stored in the index."""
return iter(self)

def dir_checksums(self):
"""Iterate over .dir checksums stored in the index."""
cmd = "SELECT checksum FROM {} WHERE dir = 1".format(self.INDEX_TABLE)
for (checksum,) in self._execute(cmd):
yield checksum

def is_dir_checksum(self, checksum):
return checksum.endswith(self.dir_suffix)

def _execute(self, cmd, parameters=()):
return self.cursor.execute(cmd, parameters)

def _executemany(self, cmd, seq_of_parameters):
return self.cursor.executemany(cmd, seq_of_parameters)

def _prepare_db(self, empty=False):
if not empty:
cmd = "PRAGMA user_version;"
self._execute(cmd)
ret = self.cursor.fetchall()
assert len(ret) == 1
assert len(ret[0]) == 1
assert isinstance(ret[0][0], int)
version = ret[0][0]

if version != self.VERSION:
logger.error(
"Index file version '{}' will be reformatted "
"to the current version '{}'.".format(
version, self.VERSION,
)
)
cmd = "DROP TABLE IF EXISTS {};"
self._execute(cmd.format(self.INDEX_TABLE))

cmd = "CREATE TABLE IF NOT EXISTS {} ({})"
self._execute(cmd.format(self.INDEX_TABLE, self.INDEX_TABLE_LAYOUT))

cmd = "PRAGMA user_version = {};"
self._execute(cmd.format(self.VERSION))

def load(self):
"""(Re)load this index database."""
retries = 1
while True:
assert self.database is None
assert self.cursor is None

empty = not os.path.isfile(self.path)
self.database = _connect_sqlite(self.path, {"nolock": 1})
self.cursor = self.database.cursor()

try:
self._prepare_db(empty=empty)
return
except sqlite3.DatabaseError:
self.cursor.close()
self.database.close()
self.database = None
self.cursor = None
if retries > 0:
os.unlink(self.path)
retries -= 1
else:
raise

def dump(self):
"""Save this index database."""
assert self.database is not None

self.database.commit()
self.cursor.close()
self.database.close()
self.database = None
self.cursor = None

def clear(self):
"""Clear this index (to force re-indexing later).
Changes to the index will not committed until dump() is called.
"""
cmd = "DELETE FROM {}".format(self.INDEX_TABLE)
self._execute(cmd)

def update(self, dir_checksums, file_checksums):
"""Update this index, adding the specified checksums.
Changes to the index will not committed until dump() is called.
"""
cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format(
self.INDEX_TABLE
)
self._executemany(
cmd, ((checksum, True) for checksum in dir_checksums)
)
self._executemany(
cmd, ((checksum, False) for checksum in file_checksums)
)

def intersection(self, checksums):
"""Iterate over values from `checksums` which exist in the index."""
# sqlite has a compile time limit of 999, see:
# https://www.sqlite.org/c3ref/c_limit_attached.html#sqlitelimitvariablenumber
for chunk in lchunks(999, checksums):
cmd = "SELECT checksum FROM {} WHERE checksum IN ({})".format(
self.INDEX_TABLE, ",".join("?" for checksum in chunk)
)
for (checksum,) in self._execute(cmd, chunk):
yield checksum
Loading

0 comments on commit b04964f

Please sign in to comment.