Skip to content

Commit

Permalink
Optimize _recreate_raw_dataset to use less memory; add versions API (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
peytondmurray authored Jan 29, 2024
1 parent 5c3b6d5 commit 3832979
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 26 deletions.
14 changes: 14 additions & 0 deletions versioned_hdf5/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,17 @@ def _diff_data(
diff[vir1] = (self[v1][name][vir1.raw], None)

return diff

@property
def versions(self) -> List[str]:
"""Return the names of the version groups in the file.
This should return the same as calling
``versioned_hdf5.versions.all_versions(self.f, include_first=False)``.
Returns
-------
List[str]
The names of versions in the file; order is arbitrary
"""
return [v for v in self._versions if '__first_version__' not in v]
40 changes: 15 additions & 25 deletions versioned_hdf5/replay.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
import gc
from typing import List, Iterable, Union, Dict, Any, Optional, Set
from typing import List, Iterable, Union, Dict, Any, Optional
from h5py import (
VirtualLayout,
h5s,
Expand All @@ -21,7 +21,6 @@

from copy import deepcopy
import posixpath
from collections import defaultdict

from .versions import all_versions
from .wrappers import (InMemoryGroup, DatasetWrapper, InMemoryDataset,
Expand Down Expand Up @@ -162,15 +161,15 @@ def _get_np_fillvalue(data: Dataset) -> Any:


def _recreate_raw_data(
f: VersionedHDF5File,
f: VersionedHDF5File | File,
name: str,
versions_to_delete: Iterable[str],
) -> Optional[Dict[NDIndex, NDIndex]]:
"""Create a new raw dataset without the chunks from versions_to_delete.
Parameters
----------
f : VersionedHDF5File
f : VersionedHDF5File | File
File for which the raw data is to be reconstructed
name : str
Name of the dataset
Expand All @@ -185,27 +184,20 @@ def _recreate_raw_data(
If no chunks would be left, i.e., the dataset does not appear in any
version not in versions_to_delete, None is returned.
"""
chunks_map = defaultdict(dict)

for version_name in all_versions(f):
if (version_name in versions_to_delete
or name not in f['_version_data/versions'][version_name]):
continue

dataset = f['_version_data/versions'][version_name][name]

if dataset.is_virtual:
for i in dataset.virtual_sources():
chunks_map[version_name].update(
{spaceid_to_slice(i.vspace): spaceid_to_slice(i.src_space)}
)
else:
chunks_map[version_name] = {}
chunks_to_keep = set()

chunks_to_keep = set().union(*[map.values() for map in
chunks_map.values()])
if isinstance(f, VersionedHDF5File):
vf = f
else:
vf = VersionedHDF5File(f)

for version in vf.versions:
if version not in versions_to_delete and name in vf[version]:
dataset = f['_version_data/versions'][version][name]

if dataset.is_virtual:
for i in dataset.virtual_sources():
chunks_to_keep.add(spaceid_to_slice(i.src_space))

raw_data = f['_version_data'][name]['raw_data']
chunks = ChunkSize(raw_data.chunks)
Expand Down Expand Up @@ -282,13 +274,11 @@ def _recreate_hashtable(f, name, raw_data_chunks_map, tmp=False):
If tmp=True, a new hashtable called '_tmp_hash_table' is created.
Otherwise the hashtable is replaced.
"""

# We could just reconstruct the hashtable with from_raw_data, but that is
# slow, so instead we recreate it manually from the old hashable and the
# raw_data_chunks_map.
old_hashtable = Hashtable(f, name)
new_hash_table = Hashtable(f, name, hash_table_name='_tmp_hash_table')
old_inverse = old_hashtable.inverse()
old_inverse = Hashtable(f, name).inverse()

for old_chunk, new_chunk in raw_data_chunks_map.items():
if isinstance(old_chunk, Tuple):
Expand Down
28 changes: 27 additions & 1 deletion versioned_hdf5/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
from .helpers import setup_vfile
from ..backend import DEFAULT_CHUNK_SIZE, DATA_VERSION
from ..api import VersionedHDF5File
from ..versions import TIMESTAMP_FMT
from ..versions import TIMESTAMP_FMT, all_versions
from ..wrappers import (InMemoryArrayDataset, InMemoryDataset,
InMemorySparseDataset, DatasetWrapper, InMemoryGroup)
from ..replay import delete_versions


def test_stage_version(vfile):
Expand Down Expand Up @@ -2403,3 +2404,28 @@ def test_get_diff_same_version(tmp_path):
diff = vfile.get_diff('test_data', 'v1', 'v1')

assert diff == {}


def test_versions_property(vfile):
"""Test that VersionedHDF5File.versions returns the same as
all_versions(vfile).
"""

for i in range(100):
with vfile.stage_version(f'r{i}') as sv:
sv['values'] = np.arange(i, 100)
assert set(all_versions(vfile.f)) == set(vfile.versions)

# keep only every 10th version
versions_to_delete = []
versions = sorted(
[(v, vfile._versions[v].attrs['timestamp']) for v in vfile._versions],
key=lambda t: t[1]
)
for i, v in enumerate(versions):
if i % 10 != 0:
versions_to_delete.append(v[0])

# Delete some versions and check for the correct versions again
delete_versions(vfile, versions_to_delete)
assert set(all_versions(vfile.f)) == set(vfile.versions)
1 change: 1 addition & 0 deletions versioned_hdf5/tests/test_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ def test_delete_versions(vfile):
f = vfile.f

delete_versions(f, ['version2', 'version3'])

check_data(vfile, version2=False)
assert list(vfile) == ['version1']
assert set(f['_version_data']) == {'group', 'test_data', 'test_data2', 'versions'}
Expand Down

0 comments on commit 3832979

Please sign in to comment.