diff --git a/sourmash/__main__.py b/sourmash/__main__.py index e9e2dfb4db..0e66b6ef1c 100644 --- a/sourmash/__main__.py +++ b/sourmash/__main__.py @@ -8,8 +8,8 @@ from .logging import error, set_quiet from .commands import (categorize, compare, compute, dump, import_csv, - gather, index, sbt_combine, search, - plot, watch, info, storage, migrate, multigather) + gather, index, sbt_combine, search, plot, watch, + info, storage, migrate, multigather, prepare) from .lca import main as lca_main from .sig import main as sig_main @@ -64,7 +64,9 @@ def main(): 'migrate': migrate, 'multigather': multigather, 'sig': sig_main, - 'signature': sig_main} + 'signature': sig_main, + 'prepare': prepare + } parser = argparse.ArgumentParser( description='work with compressed biological sequence representations') parser.add_argument('command', nargs='?') diff --git a/sourmash/commands.py b/sourmash/commands.py index 9c4db3f408..61eabfffc0 100644 --- a/sourmash/commands.py +++ b/sourmash/commands.py @@ -8,6 +8,8 @@ import os.path import sys import random +import shutil +import tempfile import screed from .sourmash_args import SourmashArgumentParser @@ -753,7 +755,7 @@ def index(args): scaleds.add(ss.minhash.scaled) leaf = SigLeaf(ss.md5sum(), ss) - tree.add_node(leaf) + tree.add_node(leaf, update_internal=False) n += 1 if not ss: @@ -1402,3 +1404,47 @@ def migrate(args): notify('saving SBT under "{}".', args.sbt_name) tree.save(args.sbt_name, structure_only=True) + + +def prepare(args): + from .sbt import parse_backend_args + from .sbt_storage import FSStorage + + parser = argparse.ArgumentParser() + parser.add_argument('sbt_name', help='name of SBT to prepare') + parser.add_argument('-x', help='new nodegraph size', default=1e5) + parser.add_argument('-b', "--backend", type=str, + help='Backend to convert to', + default='FSStorage') + args = parser.parse_args(args) + + notify('saving SBT under "{}".', args.sbt_name) + + backend, options = parse_backend_args(args.sbt_name, args.backend) + + with backend(*options) as storage: + with open(args.sbt_name, 'r') as f: + import json + temptree = json.load(f) + + if ((temptree['storage']['backend'] == 'IPFSStorage') and + (backend == FSStorage) and + ('preload' in temptree['storage']['args'])): + # Let's take a shortcut... The preload multihash contains the + # directory in the same structure FSStorage expects. + ipfs_args = temptree['storage']['args'] + multihash = ipfs_args.pop('preload') + + # TODO: in case the IPFS node is not available, need to + # fallback to read-only client + import ipfsapi + client = ipfsapi.connect(**ipfs_args) + + dirpath = os.path.join(storage.location, storage.subdir) + with tempfile.TemporaryDirectory() as temp: + client.get(multihash, filepath=temp) + shutil.rmtree(dirpath) + shutil.move(os.path.join(temp, multihash), dirpath) + + sbt = load_sbt_index(args.sbt_name, print_version_warning=False) + sbt.save(args.sbt_name, storage=storage) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 760941866d..da44a6608c 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -43,7 +43,7 @@ def search_transcript(node, seq, threshold): from __future__ import print_function, unicode_literals, division -from collections import namedtuple, defaultdict +from collections import namedtuple try: from collections.abc import Mapping except ImportError: # Python 2... @@ -56,6 +56,7 @@ def search_transcript(node, seq, threshold): from random import randint, random import sys from tempfile import NamedTemporaryFile +import zipfile import khmer @@ -64,7 +65,7 @@ def search_transcript(node, seq, threshold): except AttributeError: load_nodegraph = khmer.Nodegraph.load -from .sbt_storage import FSStorage, TarStorage, IPFSStorage, RedisStorage +from .sbt_storage import FSStorage, TarStorage, IPFSStorage, RedisStorage, ZipStorage from .logging import error, notify, debug @@ -73,6 +74,7 @@ def search_transcript(node, seq, threshold): 'FSStorage': FSStorage, 'IPFSStorage': IPFSStorage, 'RedisStorage': RedisStorage, + 'ZipStorage': ZipStorage, } NodePos = namedtuple("NodePos", ["pos", "node"]) @@ -106,7 +108,7 @@ class SBT(object): """A Sequence Bloom Tree implementation allowing generic internal nodes and leaves. The default node and leaf format is a Bloom Filter (like the original implementation), - but we also provide a MinHash leaf class (in the sourmash.sbtmh.Leaf + but we also provide a MinHash leaf class (in the sourmash.sbtmh.SigLeaf class) Parameters ---------- @@ -114,36 +116,60 @@ class SBT(object): Callable for generating new datastores for internal nodes. d: int Number of children for each internal node. Defaults to 2 (a binary tree) - n_tables: int - number of nodegraph tables to be used. - + storage: Storage, default: None + A Storage is any place where we can save and load data for the nodes. + If set to None, will use a FSStorage. Notes ----- - We use a defaultdict to store the tree structure. Nodes are numbered - specific node they are numbered + We use two dicts to store the tree structure: One for the internal nodes, + and another for the leaves (datasets). """ def __init__(self, factory, d=2, storage=None): self.factory = factory - self.nodes = defaultdict(lambda: None) - self.missing_nodes = set() + self._nodes = {} + self._missing_nodes = set() + self._leaves = {} self.d = d self.next_node = 0 self.storage = storage + self.is_ready = False def new_node_pos(self, node): - while self.nodes.get(self.next_node, None) is not None: - self.next_node += 1 + if not self._nodes: + self.next_node = 1 + return 0 + + if not self._leaves: + self.next_node = 2 + return 1 + + min_leaf = min(self._leaves.keys()) + + next_internal_node = None + if self.next_node <= min_leaf: + for i in range(min_leaf): + if all((i not in self._nodes, + i not in self._leaves, + i not in self._missing_nodes)): + next_internal_node = i + break + + if next_internal_node is None: + self.next_node = max(self._leaves.keys()) + 1 + else: + self.next_node = next_internal_node + return self.next_node - def add_node(self, node): - pos = self.new_node_pos(node) + def add_node(self, leaf, update_internal=True): + pos = self.new_node_pos(leaf) if pos == 0: # empty tree; initialize w/node. n = Node(self.factory, name="internal." + str(pos)) - self.nodes[0] = n - pos = self.new_node_pos(node) + self._nodes[0] = n + pos = self.new_node_pos(leaf) # Cases: # 1) parent is a Leaf (already covered) @@ -158,36 +184,52 @@ def add_node(self, node): # Create a new internal node # node and parent are children of new internal node n = Node(self.factory, name="internal." + str(p.pos)) - self.nodes[p.pos] = n + self._nodes[p.pos] = n c1, c2 = self.children(p.pos)[:2] - self.nodes[c1.pos] = p.node - self.nodes[c2.pos] = node + self._leaves[c1.pos] = p.node + self._leaves[c2.pos] = leaf + del self._leaves[p.pos] - for child in (p.node, node): - child.update(n) + if update_internal: + for child in (p.node, leaf): + child.update(n) + else: + self.is_ready = False elif isinstance(p.node, Node): - self.nodes[pos] = node - node.update(p.node) + self._leaves[pos] = leaf + if update_internal: + leaf.update(p.node) + else: + self.is_ready = False elif p.node is None: n = Node(self.factory, name="internal." + str(p.pos)) - self.nodes[p.pos] = n + self._nodes[p.pos] = n c1 = self.children(p.pos)[0] - self.nodes[c1.pos] = node - node.update(n) - - # update all parents! - p = self.parent(p.pos) - while p: - self._rebuild_node(p.pos) - node.update(self.nodes[p.pos]) + self._leaves[c1.pos] = leaf + if update_internal: + leaf.update(n) + else: + self.is_ready = False + + if update_internal: + # update all parents! p = self.parent(p.pos) + while p: + self._rebuild_node(p.pos) + leaf.update(self._nodes[p.pos]) + p = self.parent(p.pos) + else: + self.is_ready = False def find(self, search_fn, *args, **kwargs): "Search the tree using `search_fn`." # initialize search queue with top node of tree + if not self.is_ready: + self._fill_internal() + matches = [] visited, queue = set(), [0] @@ -195,13 +237,16 @@ def find(self, search_fn, *args, **kwargs): # function. while queue: node_p = queue.pop(0) - node_g = self.nodes.get(node_p, None) # repair while searching. + node_g = self._leaves.get(node_p, None) if node_g is None: - if node_p in self.missing_nodes: + node_g = self._nodes.get(node_p, None) + + if node_g is None: + if node_p in self._missing_nodes: self._rebuild_node(node_p) - node_g = self.nodes[node_p] + node_g = self._nodes[node_p] else: continue @@ -235,22 +280,20 @@ def _rebuild_node(self, pos=0): (the default). """ - node = self.nodes.get(pos, None) + node = self._nodes.get(pos, None) if node is not None: # this node was already build, skip return node = Node(self.factory, name="internal.{}".format(pos)) - self.nodes[pos] = node + self._nodes[pos] = node for c in self.children(pos): - if c.pos in self.missing_nodes or isinstance(c.node, Leaf): - if c.node is None: + if c.pos in self._missing_nodes or isinstance(c.node, Leaf): + cnode = c.node + if cnode is None: self._rebuild_node(c.pos) - c_node = self.nodes[c.pos] - if c_node is not None: - c_node.update(node) - self.missing_nodes.remove(pos) - + cnode = self._nodes[c.pos] + cnode.update(node) def parent(self, pos): """Return the parent of the node at position ``pos``. @@ -271,7 +314,10 @@ def parent(self, pos): if pos == 0: return None p = int(math.floor((pos - 1) / self.d)) - node = self.nodes.get(p, None) + if p in self._leaves: + return NodePos(p, self._leaves[p]) + + node = self._nodes.get(p, None) return NodePos(p, node) def children(self, pos): @@ -309,7 +355,10 @@ def child(self, parent, pos): child node. """ cd = self.d * parent + pos + 1 - node = self.nodes.get(cd, None) + if cd in self._leaves: + return NodePos(cd, self._leaves[cd]) + + node = self._nodes.get(cd, None) return NodePos(cd, node) def save(self, path, storage=None, sparseness=0.0, structure_only=False): @@ -335,7 +384,7 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): str full path to the new SBT description """ - version = 4 + version = 5 if path.endswith('.sbt.json'): path = path[:-9] @@ -363,8 +412,12 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): 'args': self.factory.init_args() } - structure = {} - total_nodes = len(self.nodes) + if not self.is_ready and structure_only is False: + self._fill_internal_and_save(storage, sparseness) + + nodes = {} + leaves = {} + total_nodes = len(self) for n, (i, node) in enumerate(self): if node is None: continue @@ -394,12 +447,16 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): data['filename'] = node.save(data['filename']) - structure[i] = data + if isinstance(node, Node): + nodes[i] = data + else: + leaves[i] = data notify("{} of {} nodes saved".format(n+1, total_nodes), end='\r') notify("\nFinished saving nodes, now saving SBT json file.") - info['nodes'] = structure + info['nodes'] = nodes + info['leaves'] = leaves with open(fn, 'w') as fp: json.dump(info, fp) @@ -424,16 +481,28 @@ def load(cls, location, leaf_loader=None, storage=None, print_version_warning=Tr SBT the SBT tree built from the description. """ - dirname = os.path.dirname(os.path.abspath(location)) - sbt_name = os.path.basename(location) - if sbt_name.endswith('.sbt.json'): - sbt_name = sbt_name[:-9] + tempfile = None + if zipfile.is_zipfile(location): + tempfile = NamedTemporaryFile() + with zipfile.ZipFile(location, 'r') as zf: + tempfile.write(zf.read('tree.sbt.json')) + tempfile.flush() + + dirname = os.path.dirname(tempfile.name) + sbt_name = os.path.basename(tempfile.name) + storage = ZipStorage(location) + else: + dirname = os.path.dirname(os.path.abspath(location)) + sbt_name = os.path.basename(location) + if sbt_name.endswith('.sbt.json'): + sbt_name = sbt_name[:-9] loaders = { 1: cls._load_v1, 2: cls._load_v2, 3: cls._load_v3, 4: cls._load_v4, + 5: cls._load_v5, } # @CTB hack: check to make sure khmer Nodegraph supports the @@ -448,11 +517,14 @@ def load(cls, location, leaf_loader=None, storage=None, print_version_warning=Tr leaf_loader = Leaf.load sbt_fn = os.path.join(dirname, sbt_name) - if not sbt_fn.endswith('.sbt.json'): + if not sbt_fn.endswith('.sbt.json') and tempfile is None: sbt_fn += '.sbt.json' with open(sbt_fn) as fp: jnodes = json.load(fp) + if tempfile is not None: + tempfile.close() + version = 1 if isinstance(jnodes, Mapping): version = jnodes['version'] @@ -469,13 +541,16 @@ def _load_v1(jnodes, leaf_loader, dirname, storage, print_version_warning=True): if jnodes[0] is None: raise ValueError("Empty tree!") - sbt_nodes = defaultdict(lambda: None) + sbt_nodes = {} + sbt_leaves = {} + + max_node = 0 sample_bf = os.path.join(dirname, jnodes[0]['filename']) ksize, tablesize, ntables = khmer.extract_nodegraph_info(sample_bf)[:3] factory = GraphFactory(ksize, tablesize, ntables) - for i, jnode in enumerate(jnodes): + for k, jnode in enumerate(jnodes): if jnode is None: continue @@ -484,13 +559,24 @@ def _load_v1(jnodes, leaf_loader, dirname, storage, print_version_warning=True): if 'internal' in jnode['name']: jnode['factory'] = factory sbt_node = Node.load(jnode, storage) + sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(jnode, storage) + sbt_leaves[k] = sbt_node - sbt_nodes[i] = sbt_node + max_node = max(max_node, k) tree = SBT(factory) - tree.nodes = sbt_nodes + tree._nodes = sbt_nodes + tree._leaves = sbt_leaves + tree._missing_nodes = {i for i in range(max_node) + if i not in sbt_nodes and i not in sbt_leaves} + + if print_version_warning: + error("WARNING: this is an old index version, please run `sourmash migrate` to update it.") + error("WARNING: proceeding with execution, but it will take longer to finish!") + + tree._fill_min_n_below() return tree @@ -501,7 +587,10 @@ def _load_v2(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru if nodes[0] is None: raise ValueError("Empty tree!") - sbt_nodes = defaultdict(lambda: None) + sbt_nodes = {} + sbt_leaves = {} + + max_node = 0 sample_bf = os.path.join(dirname, nodes[0]['filename']) k, size, ntables = khmer.extract_nodegraph_info(sample_bf)[:3] @@ -516,13 +605,24 @@ def _load_v2(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) + sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) + sbt_leaves[k] = sbt_node - sbt_nodes[k] = sbt_node + max_node = max(max_node, k) tree = cls(factory, d=info['d']) - tree.nodes = sbt_nodes + tree._nodes = sbt_nodes + tree._leaves = sbt_leaves + tree._missing_nodes = {i for i in range(max_node) + if i not in sbt_nodes and i not in sbt_leaves} + + if print_version_warning: + error("WARNING: this is an old index version, please run `sourmash migrate` to update it.") + error("WARNING: proceeding with execution, but it will take longer to finish!") + + tree._fill_min_n_below() return tree @@ -533,7 +633,8 @@ def _load_v3(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru if not nodes: raise ValueError("Empty tree!") - sbt_nodes = defaultdict(lambda: None) + sbt_nodes = {} + sbt_leaves = {} klass = STORAGES[info['storage']['backend']] if info['storage']['backend'] == "FSStorage": @@ -551,22 +652,23 @@ def _load_v3(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) + sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) + sbt_leaves[k] = sbt_node - sbt_nodes[k] = sbt_node max_node = max(max_node, k) tree = cls(factory, d=info['d'], storage=storage) - tree.nodes = sbt_nodes - tree.missing_nodes = {i for i in range(max_node) - if i not in sbt_nodes} - # TODO: this might not be true with combine... - tree.next_node = max_node + tree._nodes = sbt_nodes + tree._leaves = sbt_leaves + tree._missing_nodes = {i for i in range(max_node) + if i not in sbt_nodes and i not in sbt_leaves} if print_version_warning: error("WARNING: this is an old index version, please run `sourmash migrate` to update it.") error("WARNING: proceeding with execution, but it will take longer to finish!") + tree._fill_min_n_below() return tree @@ -578,7 +680,8 @@ def _load_v4(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru if not nodes: raise ValueError("Empty tree!") - sbt_nodes = defaultdict(lambda: None) + sbt_nodes = {} + sbt_leaves = {} klass = STORAGES[info['storage']['backend']] if info['storage']['backend'] == "FSStorage": @@ -590,24 +693,68 @@ def _load_v4(cls, info, leaf_loader, dirname, storage, print_version_warning=Tru max_node = 0 for k, node in nodes.items(): - if node is None: - continue - if 'internal' in node['name']: node['factory'] = factory sbt_node = Node.load(node, storage) + sbt_nodes[k] = sbt_node else: sbt_node = leaf_loader(node, storage) + sbt_leaves[k] = sbt_node + + max_node = max(max_node, k) + + tree = cls(factory, d=info['d'], storage=storage) + tree._nodes = sbt_nodes + tree._leaves = sbt_leaves + tree._missing_nodes = {i for i in range(max_node) + if i not in sbt_nodes and i not in sbt_leaves} + + if print_version_warning: + error("WARNING: this is an old index version, please run `sourmash migrate` to update it.") + error("WARNING: proceeding with execution, but it will take longer to finish!") + + return tree + + @classmethod + def _load_v5(cls, info, leaf_loader, dirname, storage, print_version_warning=True): + nodes = {} + if 'nodes' in info: + nodes = {int(k): v for (k, v) in info['nodes'].items()} + + leaves = {int(k): v for (k, v) in info['leaves'].items()} + + if not leaves: + raise ValueError("Empty tree!") + + sbt_nodes = {} + sbt_leaves = {} + + klass = STORAGES[info['storage']['backend']] + if info['storage']['backend'] == "FSStorage": + storage = FSStorage(dirname, info['storage']['args']['path']) + elif storage is None: + storage = klass(**info['storage']['args']) + + factory = GraphFactory(*info['factory']['args']) + + max_node = 0 + for k, node in nodes.items(): + node['factory'] = factory + sbt_node = Node.load(node, storage) sbt_nodes[k] = sbt_node max_node = max(max_node, k) + for k, node in leaves.items(): + sbt_leaf = leaf_loader(node, storage) + sbt_leaves[k] = sbt_leaf + max_node = max(max_node, k) + tree = cls(factory, d=info['d'], storage=storage) - tree.nodes = sbt_nodes - tree.missing_nodes = {i for i in range(max_node) - if i not in sbt_nodes} - # TODO: this might not be true with combine... - tree.next_node = max_node + tree._nodes = sbt_nodes + tree._leaves = sbt_leaves + tree._missing_nodes = {i for i in range(max_node) + if i not in sbt_nodes and i not in sbt_leaves} return tree @@ -637,8 +784,38 @@ def fill_min_n_below(node, *args, **kwargs): self._fill_up(fill_min_n_below) + def _fill_internal_and_save(self, storage, sparseness=0.0): + + def fill_nodegraphs_and_save(node, *args, **kwargs): + children = kwargs['children'] + for child in children: + if child.node is not None: + child.node.update(node) + + if isinstance(node, Node) and random() - sparseness > 0: + child.node.storage = storage + child.node.save(os.path.basename(node.name)) + + child.node.unload() + return True + + self._fill_up(fill_nodegraphs_and_save) + self.is_ready = True + + def _fill_internal(self): + + def fill_nodegraphs(node, *args, **kwargs): + children = kwargs['children'] + for child in children: + if child.node is not None: + child.node.update(node) + return True + + self._fill_up(fill_nodegraphs) + self.is_ready = True + def _fill_up(self, search_fn, *args, **kwargs): - visited, queue = set(), [i[0] for i in reversed(sorted(self._leaves()))] + visited, queue = set(), list(reversed(sorted(self._leaves.keys()))) debug("started filling up") processed = 0 while queue: @@ -652,7 +829,7 @@ def _fill_up(self, search_fn, *args, **kwargs): was_missing = False if parent.node is None: - if parent.pos in self.missing_nodes: + if parent.pos in self._missing_nodes: self._rebuild_node(parent.pos) parent = self.parent(node_p) was_missing = True @@ -677,6 +854,10 @@ def _fill_up(self, search_fn, *args, **kwargs): if processed % 100 == 0: debug("processed {}, in queue {}", processed, len(queue), sep='\r') + def __len__(self): + internal_nodes = set(self._nodes).union(self._missing_nodes) + return len(internal_nodes) + len(self._leaves) + def print_dot(self): print(""" digraph G { @@ -687,7 +868,7 @@ def print_dot(self): edge [arrowsize=0.8]; """) - for i, node in list(self.nodes.items()): + for i, node in self._nodes.items(): if isinstance(node, Node): print('"{}" [shape=box fillcolor=gray style=filled]'.format( node.name)) @@ -700,7 +881,7 @@ def print(self): visited, stack = set(), [0] while stack: node_p = stack.pop() - node_g = self.nodes.get(node_p, None) + node_g = self._nodes.get(node_p, None) if node_p not in visited and node_g is not None: visited.add(node_p) depth = int(math.floor(math.log(node_p + 1, self.d))) @@ -710,7 +891,9 @@ def print(self): if c.pos not in visited) def __iter__(self): - for i, node in self.nodes.items(): + for i, node in self._nodes.items(): + yield (i, node) + for i, node in self._leaves.items(): yield (i, node) def _parents(self, pos=0): @@ -722,53 +905,48 @@ def _parents(self, pos=0): yield p.pos p = self.parent(p.pos) - - def _leaves(self, pos=0): - for i, node in self: - if isinstance(node, Leaf): - if pos in self._parents(i): - yield (i, node) - - def leaves(self): - for c in self.nodes.values(): - if isinstance(c, Leaf): - yield c + def leaves(self, with_pos=False): + for pos, data in self._leaves.items(): + if with_pos: + yield (pos, data) + else: + yield data def combine(self, other): larger, smaller = self, other - if len(other.nodes) > len(self.nodes): + if len(other) > len(self): larger, smaller = other, self n = Node(self.factory, name="internal.0", storage=self.storage) - larger.nodes[0].update(n) - smaller.nodes[0].update(n) - new_nodes = defaultdict(lambda: None) + larger._nodes[0].update(n) + smaller._nodes[0].update(n) + new_nodes = {} new_nodes[0] = n - levels = int(math.ceil(math.log(len(larger.nodes), self.d))) + 1 + new_leaves = {} + + levels = int(math.ceil(math.log(len(larger), self.d))) + 1 current_pos = 1 n_previous = 0 n_next = 1 for level in range(1, levels + 1): for tree in (larger, smaller): for pos in range(n_previous, n_next): - if tree.nodes.get(pos, None) is not None: - new_node = copy(tree.nodes[pos]) - if isinstance(new_node, Node): - # An internal node, we need to update the name - new_node.name = "internal.{}".format(current_pos) + if tree._nodes.get(pos, None) is not None: + new_node = copy(tree._nodes[pos]) + new_node.name = "internal.{}".format(current_pos) new_nodes[current_pos] = new_node + elif tree._leaves.get(pos, None) is not None: + new_node = copy(tree._leaves[pos]) + new_leaves[current_pos] = new_node current_pos += 1 n_previous = n_next n_next = n_previous + int(self.d ** level) current_pos = n_next - # reset next_node, next time we add a node it will find the next - # empty position - self.next_node = 2 - # TODO: do we want to return a new tree, or merge into this one? - self.nodes = new_nodes + self._nodes = new_nodes + self._leaves = new_leaves return self @@ -825,13 +1003,17 @@ def load(info, storage=None): new_node.metadata = info.get('metadata', {}) return new_node + def unload(self): + pass + def update(self, parent): parent.data.update(self.data) - min_n_below = min(parent.metadata.get('min_n_below', sys.maxsize), - self.metadata.get('min_n_below')) - if min_n_below == 0: - min_n_below = 1 - parent.metadata['min_n_below'] = min_n_below + if 'min_n_below' in self.metadata: + min_n_below = min(parent.metadata.get('min_n_below', sys.maxsize), + self.metadata.get('min_n_below')) + if min_n_below == 0: + min_n_below = 1 + parent.metadata['min_n_below'] = min_n_below class Leaf(object): @@ -888,6 +1070,9 @@ def load(cls, info, storage=None): path=info['filename'], storage=storage) + def unload(self): + pass + def filter_distance(filter_a, filter_b, n=1000): """ @@ -920,20 +1105,18 @@ def filter_distance(filter_a, filter_b, n=1000): return distance / (8.0 * len(A) * n) -def convert_cmd(name, backend): - from .sbtmh import SigLeaf - +def parse_backend_args(name, backend): options = backend.split('(') backend = options.pop(0) backend = backend.lower().strip("'") if options: - print(options) - options = options[0].split(')') - options = [options.pop(0)] - #options = {} + print(options) + options = options[0].split(')') + options = [options.pop(0)] + #options = {} else: - options = [] + options = [] if backend.lower() in ('ipfs', 'ipfsstorage'): backend = IPFSStorage @@ -956,6 +1139,14 @@ def convert_cmd(name, backend): else: error('backend not recognized: {}'.format(backend)) + return backend, options + + +def convert_cmd(name, backend_args): + from .sbtmh import SigLeaf + + backend, options = parse_backend_args(name, backend_args) + with backend(*options) as storage: sbt = SBT.load(name, leaf_loader=SigLeaf.load) sbt.save(name, storage=storage) diff --git a/sourmash/sbt_storage.py b/sourmash/sbt_storage.py index 6c3917e5c2..6db05e5c88 100644 --- a/sourmash/sbt_storage.py +++ b/sourmash/sbt_storage.py @@ -4,6 +4,7 @@ from io import BytesIO import os import tarfile +import zipfile class Storage(abc.ABCMeta(str('ABC'), (object,), {'__slots__': ()})): @@ -56,6 +57,41 @@ def load(self, path): return out.getvalue() +class ZipStorage(Storage): + + def __init__(self, path=None): + # TODO: leave it open, or close/open every time? + + if path is None: + # TODO: Open a temporary file? + pass + + self.path = os.path.abspath(path) + + dirname = os.path.dirname(self.path) + if not os.path.exists(dirname): + os.makedirs(dirname) + + if os.path.exists(self.path): + self.zipfile = zipfile.ZipFile(path, 'r') + else: + self.zipfile = zipfile.ZipFile(path, mode='w', + compression=zipfile.ZIP_DEFLATED) + + def save(self, path, content): + self.zipfile.writestr(path, content) + return path + + def load(self, path): + return self.zipfile.read(path) + + def init_args(self): + return {'path': self.path} + + def __exit__(self, type, value, traceback): + self.zipfile.close() + + class TarStorage(Storage): def __init__(self, path=None): @@ -103,10 +139,23 @@ def __init__(self, pin_on_add=True, **kwargs): import ipfshttpclient self.ipfs_args = kwargs self.pin_on_add = pin_on_add - self.api = ipfshttpclient.connect(**self.ipfs_args) + self.read_only = False + + if 'preload' in self.ipfs_args: + del self.ipfs_args['preload'] + + try: + self.api = ipfshttpclient.connect(**self.ipfs_args) + except ipfsapi.exceptions.ConnectionError: + self.api = ipfsapi.connect('ipfs.io', 80) + self.read_only = True def save(self, path, content): # api.add_bytes(b"Mary had a little lamb") + if self.read_only: + raise NotImplementedError('This is a read-only client. ' + 'Start an IPFS node to be able to save ' + 'data.') new_obj = self.api.add_bytes(content) if self.pin_on_add: self.api.pin.add(new_obj) diff --git a/tests/test-data/ipfs_leaves.sbt.json b/tests/test-data/ipfs_leaves.sbt.json new file mode 100644 index 0000000000..24f56c7aea --- /dev/null +++ b/tests/test-data/ipfs_leaves.sbt.json @@ -0,0 +1 @@ +{"d": 2, "version": 5, "storage": {"backend": "IPFSStorage", "args": {}}, "factory": {"class": "GraphFactory", "args": [1, 100000, 4]}, "leaves": {"6": {"filename": "QmaW8C75VMgdrAVF8evR1FukPh3M4qCEPZK4ZLL7yJyMT8", "name": "6d6e87e1154e95b279e5e7db414bc37b", "metadata": "6d6e87e1154e95b279e5e7db414bc37b"}, "7": {"filename": "QmPArMUvrAK7spyH8WBp1gpExfbwG2Z4PMmCQVSaRNH5WE", "name": "60f7e23c24a8d94791cc7a8680c493f9", "metadata": "60f7e23c24a8d94791cc7a8680c493f9"}, "8": {"filename": "QmXd9x4L331soVHPBHMNeDbK73ugH2cXYoYCY8amFB2LZE", "name": "0107d767a345eff67ecdaed2ee5cd7ba", "metadata": "0107d767a345eff67ecdaed2ee5cd7ba"}, "9": {"filename": "QmWkR7Gj126x8u1kEf7z5WkGQzt4ib3yJyRYaGXL5Sk8t2", "name": "f71e78178af9e45e6f1d87a0c53c465c", "metadata": "f71e78178af9e45e6f1d87a0c53c465c"}, "10": {"filename": "QmesxNDT3P7bNhTLTaipFd6Gx6wx5SbbgW2jQfvUBdYYcS", "name": "f0c834bc306651d2b9321fb21d3e8d8f", "metadata": "f0c834bc306651d2b9321fb21d3e8d8f"}, "11": {"filename": "QmSKvxmECayQLwz77KwhsnFS8LevHZboHb3echuNxQYbWo", "name": "4e94e60265e04f0763142e20b52c0da1", "metadata": "4e94e60265e04f0763142e20b52c0da1"}, "12": {"filename": "QmVbeK97GQ6BYrv4wcn42RoiMKZonQLMY9THDgGujfAAo2", "name": "b59473c94ff2889eca5d7165936e64b3", "metadata": "b59473c94ff2889eca5d7165936e64b3"}}} diff --git a/tests/test_ipfs.py b/tests/test_ipfs.py new file mode 100644 index 0000000000..c2b3a9eb5c --- /dev/null +++ b/tests/test_ipfs.py @@ -0,0 +1,119 @@ +from __future__ import print_function, unicode_literals + +import os +import shutil + +import pytest + +from sourmash_lib import signature +from sourmash_lib.sbt import SBT, GraphFactory +from sourmash_lib.sbtmh import SigLeaf, search_minhashes +from sourmash_lib.sbt_storage import IPFSStorage, FSStorage + +from . import sourmash_tst_utils as utils + + +def test_sbt_ipfsstorage(tmpdir): + pytest.importorskip("ipfsapi") + + factory = GraphFactory(31, 1e5, 4) + + tree = SBT(factory) + + for f in utils.SIG_FILES: + sig = next(signature.load_signatures(utils.get_test_data(f))) + leaf = SigLeaf(os.path.basename(f), sig) + tree.add_node(leaf) + to_search = leaf + + print("*" * 60) + print("{}:".format(to_search.metadata)) + old_result = {str(s) for s in tree.find(search_minhashes, to_search.data, 0.1)} + print(*old_result, sep="\n") + + try: + with IPFSStorage() as storage: + tree.save(str(tmpdir.join("tree")), storage=storage) + except NotImplementedError: + pytest.xfail("Using a Read-only client for IPFS") + + with IPFSStorage() as storage: + tree = SBT.load( + str(tmpdir.join("tree")), leaf_loader=SigLeaf.load, storage=storage + ) + + print("*" * 60) + print("{}:".format(to_search.metadata)) + new_result = {str(s) for s in tree.find(search_minhashes, to_search.data, 0.1)} + print(*new_result, sep="\n") + + assert old_result == new_result + + +def test_storage_convert(tmpdir): + testdata = utils.get_test_data("v2.sbt.json") + testsbt = tmpdir.join("v2.sbt.json") + + shutil.copyfile(testdata, str(testsbt)) + shutil.copytree( + os.path.join(os.path.dirname(testdata), ".sbt.v2"), str(tmpdir.join(".sbt.v2")) + ) + + original = SBT.load(str(testsbt), leaf_loader=SigLeaf.load) + + args = ["storage", "convert", "-b", "ipfs", testsbt] + status, out, err = utils.runscript( + "sourmash", args, in_directory=str(tmpdir), fail_ok=True + ) + if not status and "ipfs.exceptions.ConnectionError" in err: + raise pytest.xfail("ipfs probably not running") + + ipfs = SBT.load(str(testsbt), leaf_loader=SigLeaf.load) + + assert len(original) == len(ipfs) + assert all(n1[1].name == n2[1].name for (n1, n2) in zip(original, ipfs)) + + args = [ + "storage", + "convert", + "-b", + """'TarStorage("{}")'""".format(tmpdir.join("v2.sbt.tar.gz")), + str(testsbt), + ] + status, out, err = utils.runscript("sourmash", args, in_directory=str(tmpdir)) + tar = SBT.load(str(testsbt), leaf_loader=SigLeaf.load) + + assert len(original) == len(tar) + assert all(n1[1].name == n2[1].name for (n1, n2) in zip(original, tar)) + + +def test_prepare_index(tmpdir): + pytest.importorskip("ipfsapi") + + try: + with IPFSStorage() as storage: + for f in utils.SIG_FILES: + with open(utils.get_test_data(f), 'rb') as data: + storage.save('', data.read()) + except NotImplementedError: + pytest.xfail("Using a Read-only client for IPFS") + + testdata = utils.get_test_data("ipfs_leaves.sbt.json") + testsbt = tmpdir.join("tree.sbt.json") + + shutil.copyfile(testdata, str(testsbt)) + + args = [ + "prepare", + str(testsbt), + ] + status, out, err = utils.runscript("sourmash", args, in_directory=str(tmpdir)) + prepared_sbt = SBT.load(str(testsbt), leaf_loader=SigLeaf.load) + assert not(isinstance(prepared_sbt.storage, IPFSStorage)) + assert isinstance(prepared_sbt.storage, FSStorage) + + sig = utils.get_test_data(utils.SIG_FILES[0]) + status, out, err = utils.runscript('sourmash', + ['search', sig, str(testsbt)], + in_directory=str(tmpdir)) + assert '3 matches:' in out diff --git a/tests/test_sbt.py b/tests/test_sbt.py index 5b47a092a4..54014a917d 100644 --- a/tests/test_sbt.py +++ b/tests/test_sbt.py @@ -3,13 +3,14 @@ import os import pytest +import shutil from sourmash import signature from sourmash.sbt import SBT, GraphFactory, Leaf, Node from sourmash.sbtmh import (SigLeaf, search_minhashes, - search_minhashes_containment) -from sourmash.sbt_storage import (FSStorage, TarStorage, - RedisStorage, IPFSStorage) + search_minhashes_containment) +from sourmash.sbt_storage import (FSStorage, TarStorage, RedisStorage, + ZipStorage) from . import sourmash_tst_utils as utils @@ -130,42 +131,25 @@ def search_transcript(node, seq, threshold): assert set(try3) == set([ 'd', 'e' ]), try3 -def test_tree_v1_load(): - tree_v1 = SBT.load(utils.get_test_data('v1.sbt.json'), - leaf_loader=SigLeaf.load) - - tree_cur = SBT.load(utils.get_test_data('v3.sbt.json'), +@pytest.mark.parametrize("old_version", + ['v1', 'v2', 'v3', 'v4']) +def test_tree_load_old_versions(old_version): + tree_old = SBT.load(utils.get_test_data(old_version + '.sbt.json'), leaf_loader=SigLeaf.load) - testdata1 = utils.get_test_data(utils.SIG_FILES[0]) - to_search = next(signature.load_signatures(testdata1)) - - results_v1 = {str(s) for s in tree_v1.find(search_minhashes_containment, - to_search, 0.1)} - results_cur = {str(s) for s in tree_cur.find(search_minhashes_containment, - to_search, 0.1)} - - assert results_v1 == results_cur - assert len(results_v1) == 4 - - -def test_tree_v2_load(): - tree_v2 = SBT.load(utils.get_test_data('v2.sbt.json'), - leaf_loader=SigLeaf.load) - - tree_cur = SBT.load(utils.get_test_data('v3.sbt.json'), + tree_cur = SBT.load(utils.get_test_data('v5.sbt.json'), leaf_loader=SigLeaf.load) testdata1 = utils.get_test_data(utils.SIG_FILES[0]) to_search = next(signature.load_signatures(testdata1)) - results_v2 = {str(s) for s in tree_v2.find(search_minhashes_containment, - to_search, 0.1)} + results_old = {str(s) for s in tree_old.find(search_minhashes_containment, + to_search, 0.1)} results_cur = {str(s) for s in tree_cur.find(search_minhashes_containment, to_search, 0.1)} - assert results_v2 == results_cur - assert len(results_v2) == 4 + assert results_old == results_cur + assert len(results_old) == 4 def test_tree_save_load(n_children): @@ -285,7 +269,7 @@ def test_sbt_combine(n_children): # check if adding a new node will use the next empty position next_empty = 0 - for n, d in enumerate(tree_1.nodes): + for n, (d, _) in enumerate(tree_1): if n != d: next_empty = n break @@ -363,9 +347,7 @@ def test_sbt_tarstorage(): assert old_result == new_result -def test_sbt_ipfsstorage(): - ipfshttpclient = pytest.importorskip('ipfshttpclient') - +def test_sbt_zipstorage(): factory = GraphFactory(31, 1e5, 4) with utils.TempDirectory() as location: tree = SBT(factory) @@ -382,13 +364,10 @@ def test_sbt_ipfsstorage(): to_search.data, 0.1)} print(*old_result, sep='\n') - try: - with IPFSStorage() as storage: - tree.save(os.path.join(location, 'tree'), storage=storage) - except ipfshttpclient.exceptions.ConnectionError: - pytest.xfail("ipfs not installed/functioning probably") + with ZipStorage(os.path.join(location, 'tree.zip')) as storage: + tree.save(os.path.join(location, 'tree'), storage=storage) - with IPFSStorage() as storage: + with ZipStorage(os.path.join(location, 'tree.zip')) as storage: tree = SBT.load(os.path.join(location, 'tree'), leaf_loader=SigLeaf.load, storage=storage) @@ -440,6 +419,23 @@ def test_sbt_redisstorage(): assert old_result == new_result +def test_load_zip(tmpdir): + testdata = utils.get_test_data("v5.zip") + testsbt = tmpdir.join("v5.zip") + + shutil.copyfile(testdata, str(testsbt)) + + tree = SBT.load(str(testsbt), leaf_loader=SigLeaf.load) + + to_search = signature.load_one_signature(utils.get_test_data(utils.SIG_FILES[0])) + + print("*" * 60) + print("{}:".format(to_search)) + new_result = {str(s) for s in tree.find(search_minhashes, to_search, 0.1)} + print(*new_result, sep="\n") + assert len(new_result) == 2 + + def test_tree_repair(): tree_repair = SBT.load(utils.get_test_data('leaves.sbt.json'), leaf_loader=SigLeaf.load) @@ -468,7 +464,7 @@ def test_tree_repair_add_node(): leaf = SigLeaf(os.path.basename(f), sig) tree_repair.add_node(leaf) - for pos, node in list(tree_repair.nodes.items()): + for pos, node in tree_repair: # Every parent of a node must be an internal node (and not a leaf), # except for node 0 (the root), whose parent is None. if pos != 0: @@ -499,7 +495,7 @@ def test_save_sparseness(n_children): tree.save(os.path.join(location, 'demo'), sparseness=1.0) tree_loaded = SBT.load(os.path.join(location, 'demo'), leaf_loader=SigLeaf.load) - assert all(not isinstance(n, Node) for n in tree_loaded.nodes.values()) + assert all(not isinstance(n, Node) for _, n in tree_loaded) print('*' * 60) print("{}:".format(to_search.metadata)) @@ -509,7 +505,7 @@ def test_save_sparseness(n_children): assert old_result == new_result - for pos, node in list(tree_loaded.nodes.items()): + for pos, node in tree_loaded: # Every parent of a node must be an internal node (and not a leaf), # except for node 0 (the root), whose parent is None. if pos != 0: diff --git a/tests/test_sourmash.py b/tests/test_sourmash.py index ce3f57e56b..2708652b66 100644 --- a/tests/test_sourmash.py +++ b/tests/test_sourmash.py @@ -1508,7 +1508,7 @@ def test_do_sourmash_sbt_search_check_bug(): assert '1 matches:' in out tree = load_sbt_index(os.path.join(location, 'zzz.sbt.json')) - assert tree.nodes[0].metadata['min_n_below'] == 431 + assert tree._nodes[0].metadata['min_n_below'] == 431 def test_do_sourmash_sbt_search_empty_sig(): @@ -1532,7 +1532,7 @@ def test_do_sourmash_sbt_search_empty_sig(): assert '1 matches:' in out tree = load_sbt_index(os.path.join(location, 'zzz.sbt.json')) - assert tree.nodes[0].metadata['min_n_below'] == 1 + assert tree._nodes[0].metadata['min_n_below'] == 1 def test_do_sourmash_sbt_move_and_search_output(): @@ -3519,44 +3519,6 @@ def test_watch_coverage(): assert 'FOUND: genome-s10.fa.gz, at 1.000' in out -def test_storage_convert(): - import pytest - - with utils.TempDirectory() as location: - testdata = utils.get_test_data('v2.sbt.json') - shutil.copyfile(testdata, os.path.join(location, 'v2.sbt.json')) - shutil.copytree(os.path.join(os.path.dirname(testdata), '.sbt.v2'), - os.path.join(location, '.sbt.v2')) - testsbt = os.path.join(location, 'v2.sbt.json') - - original = SBT.load(testsbt, leaf_loader=SigLeaf.load) - - args = ['storage', 'convert', '-b', 'ipfs', testsbt] - status, out, err = utils.runscript('sourmash', args, - in_directory=location, fail_ok=True) - if not status and "ipfs.exceptions.ConnectionError" in err: - raise pytest.xfail('ipfs probably not running') - - ipfs = SBT.load(testsbt, leaf_loader=SigLeaf.load) - - assert len(original.nodes) == len(ipfs.nodes) - assert all(n1[1].name == n2[1].name - for (n1, n2) in zip(sorted(original.nodes.items()), - sorted(ipfs.nodes.items()))) - - args = ['storage', 'convert', - '-b', """'TarStorage("{}")'""".format( - os.path.join(location, 'v2.sbt.tar.gz')), - testsbt] - status, out, err = utils.runscript('sourmash', args, - in_directory=location) - tar = SBT.load(testsbt, leaf_loader=SigLeaf.load) - - assert len(original.nodes) == len(tar.nodes) - assert all(n1[1].name == n2[1].name - for (n1, n2) in zip(sorted(original.nodes.items()), - sorted(tar.nodes.items()))) - def test_storage_convert_identity(): with utils.TempDirectory() as location: testdata = utils.get_test_data('v2.sbt.json') @@ -3573,10 +3535,9 @@ def test_storage_convert_identity(): identity = SBT.load(testsbt, leaf_loader=SigLeaf.load) - assert len(original.nodes) == len(identity.nodes) + assert len(original) == len(identity) assert all(n1[1].name == n2[1].name - for (n1, n2) in zip(sorted(original.nodes.items()), - sorted(identity.nodes.items()))) + for (n1, n2) in zip(sorted(original), sorted(identity))) def test_storage_convert_fsstorage_newpath(): @@ -3597,10 +3558,9 @@ def test_storage_convert_fsstorage_newpath(): identity = SBT.load(testsbt, leaf_loader=SigLeaf.load) - assert len(original.nodes) == len(identity.nodes) + assert len(original) == len(identity) assert all(n1[1].name == n2[1].name - for (n1, n2) in zip(sorted(original.nodes.items()), - sorted(identity.nodes.items()))) + for (n1, n2) in zip(sorted(original), sorted(identity))) def test_migrate(): @@ -3618,14 +3578,14 @@ def test_migrate(): identity = SBT.load(testsbt, leaf_loader=SigLeaf.load) - assert len(original.nodes) == len(identity.nodes) + assert len(original) == len(identity) assert all(n1[1].name == n2[1].name - for (n1, n2) in zip(sorted(original.nodes.items()), - sorted(identity.nodes.items()))) + for (n1, n2) in zip(sorted(original), + sorted(identity))) assert "this is an old index version" not in err assert all('min_n_below' in node.metadata - for node in identity.nodes.values() + for node in identity if isinstance(node, Node))