From b476b90486b0f9bd6fee5fb7a206469af0c71869 Mon Sep 17 00:00:00 2001 From: pdmurray Date: Fri, 22 Sep 2023 15:14:08 -0700 Subject: [PATCH] Guard against exceptions when reconstructing raw data in delete_versions --- versioned_hdf5/replay.py | 122 +++++++++++++++++++--------- versioned_hdf5/tests/test_replay.py | 55 ++++++++++++- 2 files changed, 137 insertions(+), 40 deletions(-) diff --git a/versioned_hdf5/replay.py b/versioned_hdf5/replay.py index ac4f3e75..86752f64 100644 --- a/versioned_hdf5/replay.py +++ b/versioned_hdf5/replay.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import List, Iterable, Union, Dict, Any +from typing import List, Iterable, Union, Dict, Any, Optional from h5py import ( VirtualLayout, h5s, @@ -165,18 +165,37 @@ def _recreate_raw_data( name: str, versions_to_delete: Iterable[str], tmp: bool = False -) -> Dict[NDIndex, NDIndex]: - """ - Return a new raw data set for a dataset without the chunks from - versions_to_delete. +) -> Optional[Dict[NDIndex, NDIndex]]: + """Create a new raw dataset without the chunks from versions_to_delete. + + This function can be memory-hungry, because it loads all the data from all versions + of the dataset before writing to a temporary location in the file (_tmp_raw_data). + If creating _tmp_raw_data fails, _tmp_raw_data will be deleted from the file. If a + dataset called _tmp_raw_data exists in the file to begin with, it will be deleted + before reconstruction is attempted. + + See https://github.com/deshaw/versioned-hdf5/issues/273 for more information. - If no chunks would be left, i.e., the dataset does not appear in any - version not in versions_to_delete, None is returned. + Parameters + ---------- + f : VersionedHDF5File + File for which the raw data is to be reconstructed + name : str + Name of the dataset + versions_to_delete : Iterable[str] + Versions to omit from the reconstructed raw_data + tmp : bool + If True, the new raw dataset (called '_tmp_raw_data') is placed alongside the + existing raw dataset. Otherwise the existing raw dataset is replaced, and + _tmp_raw_data is removed. - If tmp is True, the new raw dataset is called '_tmp_raw_data' and is - placed alongside the existing raw dataset. Otherwise the existing raw - dataset is replaced. + Returns + ------- + Optional[Dict[NDIndex, NDIndex]] + A mapping between user-space data chunks and the raw_data chunks + If no chunks would be left, i.e., the dataset does not appear in any + version not in versions_to_delete, None is returned. """ chunks_map = defaultdict(dict) @@ -215,35 +234,60 @@ def _recreate_raw_data( if fillvalue not in [0, '', b'', None]: raise ValueError("Non-default fillvalue not supported for variable length strings") fillvalue = None - new_raw_data = f['_version_data'][name].create_dataset( - '_tmp_raw_data', shape=new_shape, maxshape=(None,)+chunks[1:], - chunks=raw_data.chunks, dtype=dtype, - compression=raw_data.compression, - compression_opts=raw_data.compression_opts, - fillvalue=fillvalue) - for key, val in raw_data.attrs.items(): - new_raw_data.attrs[key] = val - - r = raw_data[:] - n = np.full(new_raw_data.shape, _get_np_fillvalue(raw_data), dtype=new_raw_data.dtype) - raw_data_chunks_map = {} - for new_chunk, chunk in zip(chunks.indices(new_shape), chunks_to_keep): - # Shrink new_chunk to the size of chunk, in case chunk isn't a full - # chunk in one of the dimensions. - # TODO: Implement something in ndindex to do this. - new_chunk = Tuple( - *[Slice(new_chunk.args[i].start, - new_chunk.args[i].start+len(chunk.args[i])) - for i in range(len(new_chunk.args))]) - raw_data_chunks_map[chunk] = new_chunk - n[new_chunk.raw] = r[chunk.raw] - - new_raw_data[:] = n - if not tmp: - del f['_version_data'][name]['raw_data'] - f['_version_data'][name].move('_tmp_raw_data', 'raw_data') - return raw_data_chunks_map + # Guard against existing _tmp_raw_data + _delete_tmp_raw_data(f, name) + + try: + new_raw_data = f['_version_data'][name].create_dataset( + '_tmp_raw_data', shape=new_shape, maxshape=(None,)+chunks[1:], + chunks=raw_data.chunks, dtype=dtype, + compression=raw_data.compression, + compression_opts=raw_data.compression_opts, + fillvalue=fillvalue) + for key, val in raw_data.attrs.items(): + new_raw_data.attrs[key] = val + + r = raw_data[:] + n = np.full(new_raw_data.shape, _get_np_fillvalue(raw_data), dtype=new_raw_data.dtype) + raw_data_chunks_map = {} + for new_chunk, chunk in zip(chunks.indices(new_shape), chunks_to_keep): + # Shrink new_chunk to the size of chunk, in case chunk isn't a full + # chunk in one of the dimensions. + # TODO: Implement something in ndindex to do this. + new_chunk = Tuple( + *[Slice(new_chunk.args[i].start, + new_chunk.args[i].start+len(chunk.args[i])) + for i in range(len(new_chunk.args))]) + raw_data_chunks_map[chunk] = new_chunk + n[new_chunk.raw] = r[chunk.raw] + + new_raw_data[:] = n + if not tmp: + del f['_version_data'][name]['raw_data'] + f['_version_data'][name].move('_tmp_raw_data', 'raw_data') + + return raw_data_chunks_map + except Exception as e: + # If there's an issue writing data to the file (e.g. OOM), remove the temporary + # raw data + _delete_tmp_raw_data(f, name) + raise IOError("Exception raised while recreating the raw dataset.") from e + + +def _delete_tmp_raw_data(f: File, name: str): + """Delete _tmp_raw_data if it exists in the file. + + Parameters + ---------- + f : File + File in which _tmp_raw_data is to be removed + name : str + Name of the dataset where _tmp_raw_data is to be removed + """ + if '_tmp_raw_data' in f['_version_data'][name]: + del f['_version_data'][name]['_tmp_raw_data'] + def _recreate_hashtable(f, name, raw_data_chunks_map, tmp=False): """ @@ -492,7 +536,7 @@ def _walk(g: HLObject, prefix: str = '') -> List[str]: def delete_versions( f: Union[VersionedHDF5File, File], - versions_to_delete: Iterable[str] + versions_to_delete: Union[str, Iterable[str]] ): """Completely delete the given versions from a file diff --git a/versioned_hdf5/tests/test_replay.py b/versioned_hdf5/tests/test_replay.py index 81726dc9..2bd69c4a 100644 --- a/versioned_hdf5/tests/test_replay.py +++ b/versioned_hdf5/tests/test_replay.py @@ -7,6 +7,7 @@ import h5py import numpy as np import pytest +from unittest import mock from versioned_hdf5 import VersionedHDF5File from versioned_hdf5.hashtable import Hashtable @@ -14,7 +15,8 @@ _recreate_raw_data, _recreate_virtual_dataset, delete_version, delete_versions, - modify_metadata) + modify_metadata, + _delete_tmp_raw_data) def setup_vfile(file): @@ -932,3 +934,54 @@ def trace_prev_version_line_calls(frame, event, arg): # we end up with 8619 executions on Python 3.10, but the number # varies between Python versions assert 8600 <= line_counts <= 8650 + + +def test_delete_versions_failure(vfile): + """Check that delete_versions gracefully handles an OOM error. + + Also check that _tmp_raw_data doesn't exist anywhere in the dataset after the error + occurs. + """ + setup_vfile(vfile) + + def raise_error(): + raise MemoryError + + with mock.patch('numpy.full', side_effect=raise_error) as mock_create_dataset: + with pytest.raises(IOError): + delete_versions(vfile, 'version1') + + mock_create_dataset.assert_called() + + for name in ['test_data', 'test_data2', 'test_data3']: + assert '_tmp_raw_data' not in vfile.f['_version_data'][name] + + +def test_delete_versions_remove_extra_tmp_raw_data(vfile): + """Check that delete_versions deletes _tmp_raw_data if it exists initially.""" + with vfile.stage_version('version1') as g: + g['test_data'] = np.ones(10) + + with vfile.stage_version('version2') as g: + g['test_data'][5:] = 2 + + vfile.f['_version_data']['test_data'].create_dataset( + '_tmp_raw_data', + dtype='int' + ) + + def raise_error(): + raise MemoryError + + with mock.patch('numpy.full', side_effect=raise_error) as mock_create_dataset: + with mock.patch( + 'versioned_hdf5.replay._delete_tmp_raw_data', wraps=_delete_tmp_raw_data + ) as mock_delete_tmp_raw_data: + with pytest.raises(IOError): + delete_versions(vfile, 'version1') + + assert mock_delete_tmp_raw_data.call_count == 2 + + mock_create_dataset.assert_called() + + assert '_tmp_raw_data' not in vfile.f['_version_data']['test_data']