Skip to content

Commit

Permalink
Guard against exceptions when reconstructing raw data in delete_versions
Browse files Browse the repository at this point in the history
  • Loading branch information
peytondmurray committed Sep 27, 2023
1 parent bd27f87 commit b476b90
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 40 deletions.
122 changes: 83 additions & 39 deletions versioned_hdf5/replay.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import List, Iterable, Union, Dict, Any
from typing import List, Iterable, Union, Dict, Any, Optional
from h5py import (
VirtualLayout,
h5s,
Expand Down Expand Up @@ -165,18 +165,37 @@ def _recreate_raw_data(
name: str,
versions_to_delete: Iterable[str],
tmp: bool = False
) -> Dict[NDIndex, NDIndex]:
"""
Return a new raw data set for a dataset without the chunks from
versions_to_delete.
) -> Optional[Dict[NDIndex, NDIndex]]:
"""Create a new raw dataset without the chunks from versions_to_delete.
This function can be memory-hungry, because it loads all the data from all versions
of the dataset before writing to a temporary location in the file (_tmp_raw_data).
If creating _tmp_raw_data fails, _tmp_raw_data will be deleted from the file. If a
dataset called _tmp_raw_data exists in the file to begin with, it will be deleted
before reconstruction is attempted.
See https://github.com/deshaw/versioned-hdf5/issues/273 for more information.
If no chunks would be left, i.e., the dataset does not appear in any
version not in versions_to_delete, None is returned.
Parameters
----------
f : VersionedHDF5File
File for which the raw data is to be reconstructed
name : str
Name of the dataset
versions_to_delete : Iterable[str]
Versions to omit from the reconstructed raw_data
tmp : bool
If True, the new raw dataset (called '_tmp_raw_data') is placed alongside the
existing raw dataset. Otherwise the existing raw dataset is replaced, and
_tmp_raw_data is removed.
If tmp is True, the new raw dataset is called '_tmp_raw_data' and is
placed alongside the existing raw dataset. Otherwise the existing raw
dataset is replaced.
Returns
-------
Optional[Dict[NDIndex, NDIndex]]
A mapping between user-space data chunks and the raw_data chunks
If no chunks would be left, i.e., the dataset does not appear in any
version not in versions_to_delete, None is returned.
"""
chunks_map = defaultdict(dict)

Expand Down Expand Up @@ -215,35 +234,60 @@ def _recreate_raw_data(
if fillvalue not in [0, '', b'', None]:
raise ValueError("Non-default fillvalue not supported for variable length strings")
fillvalue = None
new_raw_data = f['_version_data'][name].create_dataset(
'_tmp_raw_data', shape=new_shape, maxshape=(None,)+chunks[1:],
chunks=raw_data.chunks, dtype=dtype,
compression=raw_data.compression,
compression_opts=raw_data.compression_opts,
fillvalue=fillvalue)
for key, val in raw_data.attrs.items():
new_raw_data.attrs[key] = val

r = raw_data[:]
n = np.full(new_raw_data.shape, _get_np_fillvalue(raw_data), dtype=new_raw_data.dtype)
raw_data_chunks_map = {}
for new_chunk, chunk in zip(chunks.indices(new_shape), chunks_to_keep):
# Shrink new_chunk to the size of chunk, in case chunk isn't a full
# chunk in one of the dimensions.
# TODO: Implement something in ndindex to do this.
new_chunk = Tuple(
*[Slice(new_chunk.args[i].start,
new_chunk.args[i].start+len(chunk.args[i]))
for i in range(len(new_chunk.args))])
raw_data_chunks_map[chunk] = new_chunk
n[new_chunk.raw] = r[chunk.raw]

new_raw_data[:] = n
if not tmp:
del f['_version_data'][name]['raw_data']
f['_version_data'][name].move('_tmp_raw_data', 'raw_data')

return raw_data_chunks_map
# Guard against existing _tmp_raw_data
_delete_tmp_raw_data(f, name)

try:
new_raw_data = f['_version_data'][name].create_dataset(
'_tmp_raw_data', shape=new_shape, maxshape=(None,)+chunks[1:],
chunks=raw_data.chunks, dtype=dtype,
compression=raw_data.compression,
compression_opts=raw_data.compression_opts,
fillvalue=fillvalue)
for key, val in raw_data.attrs.items():
new_raw_data.attrs[key] = val

r = raw_data[:]
n = np.full(new_raw_data.shape, _get_np_fillvalue(raw_data), dtype=new_raw_data.dtype)
raw_data_chunks_map = {}
for new_chunk, chunk in zip(chunks.indices(new_shape), chunks_to_keep):
# Shrink new_chunk to the size of chunk, in case chunk isn't a full
# chunk in one of the dimensions.
# TODO: Implement something in ndindex to do this.
new_chunk = Tuple(
*[Slice(new_chunk.args[i].start,
new_chunk.args[i].start+len(chunk.args[i]))
for i in range(len(new_chunk.args))])
raw_data_chunks_map[chunk] = new_chunk
n[new_chunk.raw] = r[chunk.raw]

new_raw_data[:] = n
if not tmp:
del f['_version_data'][name]['raw_data']
f['_version_data'][name].move('_tmp_raw_data', 'raw_data')

return raw_data_chunks_map
except Exception as e:
# If there's an issue writing data to the file (e.g. OOM), remove the temporary
# raw data
_delete_tmp_raw_data(f, name)
raise IOError("Exception raised while recreating the raw dataset.") from e


def _delete_tmp_raw_data(f: File, name: str):
"""Delete _tmp_raw_data if it exists in the file.
Parameters
----------
f : File
File in which _tmp_raw_data is to be removed
name : str
Name of the dataset where _tmp_raw_data is to be removed
"""
if '_tmp_raw_data' in f['_version_data'][name]:
del f['_version_data'][name]['_tmp_raw_data']


def _recreate_hashtable(f, name, raw_data_chunks_map, tmp=False):
"""
Expand Down Expand Up @@ -492,7 +536,7 @@ def _walk(g: HLObject, prefix: str = '') -> List[str]:

def delete_versions(
f: Union[VersionedHDF5File, File],
versions_to_delete: Iterable[str]
versions_to_delete: Union[str, Iterable[str]]
):
"""Completely delete the given versions from a file
Expand Down
55 changes: 54 additions & 1 deletion versioned_hdf5/tests/test_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import h5py
import numpy as np
import pytest
from unittest import mock

from versioned_hdf5 import VersionedHDF5File
from versioned_hdf5.hashtable import Hashtable
from versioned_hdf5.replay import (_recreate_hashtable,
_recreate_raw_data,
_recreate_virtual_dataset,
delete_version, delete_versions,
modify_metadata)
modify_metadata,
_delete_tmp_raw_data)


def setup_vfile(file):
Expand Down Expand Up @@ -932,3 +934,54 @@ def trace_prev_version_line_calls(frame, event, arg):
# we end up with 8619 executions on Python 3.10, but the number
# varies between Python versions
assert 8600 <= line_counts <= 8650


def test_delete_versions_failure(vfile):
"""Check that delete_versions gracefully handles an OOM error.
Also check that _tmp_raw_data doesn't exist anywhere in the dataset after the error
occurs.
"""
setup_vfile(vfile)

def raise_error():
raise MemoryError

with mock.patch('numpy.full', side_effect=raise_error) as mock_create_dataset:
with pytest.raises(IOError):
delete_versions(vfile, 'version1')

mock_create_dataset.assert_called()

for name in ['test_data', 'test_data2', 'test_data3']:
assert '_tmp_raw_data' not in vfile.f['_version_data'][name]


def test_delete_versions_remove_extra_tmp_raw_data(vfile):
"""Check that delete_versions deletes _tmp_raw_data if it exists initially."""
with vfile.stage_version('version1') as g:
g['test_data'] = np.ones(10)

with vfile.stage_version('version2') as g:
g['test_data'][5:] = 2

vfile.f['_version_data']['test_data'].create_dataset(
'_tmp_raw_data',
dtype='int'
)

def raise_error():
raise MemoryError

with mock.patch('numpy.full', side_effect=raise_error) as mock_create_dataset:
with mock.patch(
'versioned_hdf5.replay._delete_tmp_raw_data', wraps=_delete_tmp_raw_data
) as mock_delete_tmp_raw_data:
with pytest.raises(IOError):
delete_versions(vfile, 'version1')

assert mock_delete_tmp_raw_data.call_count == 2

mock_create_dataset.assert_called()

assert '_tmp_raw_data' not in vfile.f['_version_data']['test_data']

0 comments on commit b476b90

Please sign in to comment.