diff --git a/CHANGES.rst b/CHANGES.rst index 6bfcb1ed8..006727c8a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,6 +8,7 @@ The ASDF Standard is at v1.6.0 - Add ``all_array_storage``, ``all_array_compression`` and ``all_array_compression_kwargs`` to ``asdf.config.AsdfConfig`` [#1468] - Move built-in tags to converters (except ndarray and integer). [#1474] +- Add block storage support to Converter [#1508] 2.15.0 (2023-03-28) ------------------- diff --git a/asdf/_tests/commands/tests/test_exploded.py b/asdf/_tests/commands/tests/test_exploded.py index 89552c5a8..e431fbb2d 100644 --- a/asdf/_tests/commands/tests/test_exploded.py +++ b/asdf/_tests/commands/tests/test_exploded.py @@ -24,7 +24,8 @@ def test_explode_then_implode(tmpdir): # in internal blocks rather than letting some of them be automatically put # inline. ff.write_to(path, all_array_storage="internal") - assert len(ff._blocks) == 2 + with asdf.open(path) as af: + assert len(af._blocks._internal_blocks) == 2 result = main.main_from_args(["explode", path]) diff --git a/asdf/_tests/tags/core/tests/test_integer.py b/asdf/_tests/tags/core/tests/test_integer.py index 9c29fc92d..3cd690c25 100644 --- a/asdf/_tests/tags/core/tests/test_integer.py +++ b/asdf/_tests/tags/core/tests/test_integer.py @@ -67,9 +67,9 @@ def test_integer_storage_duplication(tmpdir): with asdf.AsdfFile(tree) as af: af.write_to(tmpfile) - assert len(af._blocks) == 1 with asdf.open(tmpfile, _force_raw_types=True) as rf: + assert len(af._blocks) == 1 assert rf.tree["integer1"]["words"]["source"] == 0 assert rf.tree["integer2"]["words"]["source"] == 0 diff --git a/asdf/_tests/test_api.py b/asdf/_tests/test_api.py index aff6359a2..24ceec8f6 100644 --- a/asdf/_tests/test_api.py +++ b/asdf/_tests/test_api.py @@ -1,3 +1,4 @@ +import copy import getpass import io import os @@ -439,6 +440,7 @@ def test_array_inline_threshold_masked_array(array_inline_threshold, inline_bloc with asdf.AsdfFile(tree) as af: af.write_to(file_path) + with asdf.open(file_path) as af: assert len(list(af._blocks.inline_blocks)) == inline_blocks assert len(list(af._blocks.internal_blocks)) == internal_blocks @@ -547,3 +549,18 @@ def test_asdf_standard_version_tag_selection(): content = buff.read() assert b"!core/asdf-1.0.0" not in content assert b"!core/asdf-1.1.0" in content + + +def test_write_to_no_tree_modification(tmp_path): + fn = tmp_path / "test.asdf" + fn2 = tmp_path / "test2.asdf" + tree = {"foo": None} + af = asdf.AsdfFile(tree.copy()) + af.write_to(fn) + assert tree == af.tree + with asdf.open(fn) as af: + af["history"]["extensions"][0]["software"]["version"] = "0.0.0.dev+abcdefg" + af["asdf_library"]["author"] = "foo" + tree = copy.deepcopy(af.tree) + af.write_to(fn2) + assert af.tree == tree diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index 10c634d47..b95be394b 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -3,6 +3,7 @@ import numpy as np import pytest +import yaml from numpy.random import random from numpy.testing import assert_array_equal @@ -26,6 +27,15 @@ def test_external_block(tmp_path): assert "test0000.asdf" in os.listdir(tmp_path) +def test_external_block_url(): + uri = "asdf://foo" + my_array = RNG.normal(size=(8, 8)) + tree = {"my_array": my_array} + asdf.get_config().all_array_storage = "external" + # this should not raise a ValueError since uri is provided + asdf.AsdfFile(tree, uri=uri) + + def test_external_block_non_url(): my_array = RNG.normal(size=(8, 8)) tree = {"my_array": my_array} @@ -655,7 +665,8 @@ def test_invalid_block_index_values(): assert len(ff._blocks) == 1 -def test_invalid_last_block_index(): +@pytest.mark.parametrize("block_index_index", [0, -1]) +def test_invalid_block_index_offset(block_index_index): """ This adds a value in the block index that points to something that isn't a block @@ -670,13 +681,33 @@ def test_invalid_last_block_index(): tree = {"arrays": arrays} ff = asdf.AsdfFile(tree) - ff.write_to(buff, include_block_index=False) - ff._blocks._internal_blocks[-1]._offset -= 4 - ff._blocks.write_block_index(buff, ff) + ff.write_to(buff) + + # now overwrite the block index with the first entry + # incorrectly pointing to a non-block offset + buff.seek(0) + bs = buff.read() + block_index_header_start = bs.index(constants.INDEX_HEADER) + block_index_start = block_index_header_start + len(constants.INDEX_HEADER) + block_index = yaml.load(bs[block_index_start:], yaml.SafeLoader) + block_index[block_index_index] -= 4 + yaml_version = tuple(int(x) for x in ff.version_map["YAML_VERSION"].split(".")) + buff.seek(block_index_start) + yaml.dump( + block_index, + stream=buff, + explicit_start=True, + explicit_end=True, + version=yaml_version, + allow_unicode=True, + encoding="utf-8", + ) buff.seek(0) with asdf.open(buff) as ff: assert len(ff._blocks) == 1 + for i, a in enumerate(arrays): + assert_array_equal(ff["arrays"][i], a) def test_unordered_block_index(): @@ -702,30 +733,6 @@ def test_unordered_block_index(): assert len(ff._blocks) == 1 -def test_invalid_block_index_first_block_value(): - """ - This creates a bogus block index where the offset of the first - block doesn't match what we already know it to be. In this - case, we should reject the whole block index. - """ - buff = io.BytesIO() - - arrays = [] - for i in range(10): - arrays.append(np.ones((8, 8)) * i) - - tree = {"arrays": arrays} - - ff = asdf.AsdfFile(tree) - ff.write_to(buff, include_block_index=False) - ff._blocks._internal_blocks[0]._offset -= 4 - ff._blocks.write_block_index(buff, ff) - - buff.seek(0) - with asdf.open(buff) as ff: - assert len(ff._blocks) == 1 - - def test_invalid_block_id(): ff = asdf.AsdfFile() with pytest.raises(ValueError, match=r"Invalid source id .*"): @@ -859,7 +866,11 @@ def test_write_to_update_storage_options(tmp_path, all_array_storage, all_array_ if all_array_compression == "bzp2" and compression_kwargs is not None: compression_kwargs = {"compresslevel": 1} - def assert_result(ff, arr): + def assert_result(ff): + if "array" not in ff: + # this was called from _write_to while making an external block + # so don't check the result + return if all_array_storage == "external": assert "test0000.asdf" in os.listdir(tmp_path) else: @@ -868,10 +879,12 @@ def assert_result(ff, arr): assert len(ff._blocks._internal_blocks) == 1 else: assert len(ff._blocks._internal_blocks) == 0 - blk = ff._blocks[arr] + blk = ff._blocks[ff["array"]] target_compression = all_array_compression or None - assert blk._output_compression == target_compression + if target_compression == "input": + target_compression = None + assert blk.output_compression == target_compression target_compression_kwargs = compression_kwargs or {} assert blk._output_compression_kwargs == target_compression_kwargs @@ -881,6 +894,19 @@ def assert_result(ff, arr): fn = tmp_path / "test.asdf" ff1 = asdf.AsdfFile(tree) + + # as a new AsdfFile is used for write_to and we want + # to check blocks here, we patch _write_to to allow us + # to inspect the blocks in the new AsdfFile before + # it falls out of scope + original = asdf.AsdfFile._write_to + + def patched(self, *args, **kwargs): + original(self, *args, **kwargs) + assert_result(self) + + asdf.AsdfFile._write_to = patched + # first check write_to ff1.write_to( fn, @@ -888,7 +914,8 @@ def assert_result(ff, arr): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - assert_result(ff1, arr1) + + asdf.AsdfFile._write_to = original # then reuse the file to check update with asdf.open(fn, mode="rw") as ff2: @@ -899,7 +926,7 @@ def assert_result(ff, arr): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - assert_result(ff2, arr2) + assert_result(ff2) def test_block_key(): @@ -971,3 +998,43 @@ def __call__(self): mode="r", ) as f: rb.read(f, past_magic=False) + + +def test_remove_blocks(tmp_path): + """Test that writing to a new file""" + fn1 = tmp_path / "test.asdf" + fn2 = tmp_path / "test2.asdf" + + tree = {"a": np.zeros(3), "b": np.ones(1)} + af = asdf.AsdfFile(tree) + af.write_to(fn1) + + with asdf.open(fn1, mode="rw") as af: + assert len(af._blocks._internal_blocks) == 2 + af["a"] = None + af.write_to(fn2) + + with asdf.open(fn1, mode="rw") as af: + assert len(af._blocks._internal_blocks) == 2 + af["a"] = None + af.update() + + for fn in (fn1, fn2): + with asdf.open(fn) as af: + assert len(af._blocks._internal_blocks) == 1 + + +def test_write_to_before_update(tmp_path): + # this is a regression test for: https://github.com/asdf-format/asdf/issues/1505 + fn1 = tmp_path / "test1.asdf" + fn2 = tmp_path / "test2.asdf" + + tree = {"a": np.zeros(3), "b": np.ones(3)} + af = asdf.AsdfFile(tree) + + af.write_to(fn1) + + with asdf.open(fn1, mode="rw") as af: + af["a"] = None + af.write_to(fn2) + af.update() diff --git a/asdf/_tests/test_asdf.py b/asdf/_tests/test_asdf.py index 1501f3a21..e45b621ca 100644 --- a/asdf/_tests/test_asdf.py +++ b/asdf/_tests/test_asdf.py @@ -196,7 +196,7 @@ def test_open_asdf_extensions(tmp_path): def test_serialization_context(): extension_manager = ExtensionManager([]) - context = SerializationContext("1.4.0", extension_manager, "file://test.asdf") + context = SerializationContext("1.4.0", extension_manager, "file://test.asdf", None) assert context.version == "1.4.0" assert context.extension_manager is extension_manager assert context._extensions_used == set() @@ -215,7 +215,7 @@ def test_serialization_context(): context._mark_extension_used(object()) with pytest.raises(ValueError, match=r"ASDF Standard version .* is not supported by asdf==.*"): - SerializationContext("0.5.4", extension_manager, None) + SerializationContext("0.5.4", extension_manager, None, None) def test_reading_extension_metadata(): diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py new file mode 100644 index 000000000..4a58926b0 --- /dev/null +++ b/asdf/_tests/test_block_converter.py @@ -0,0 +1,276 @@ +import contextlib + +import numpy as np +import pytest +from numpy.testing import assert_array_equal + +import asdf +from asdf.extension import Converter, Extension +from asdf.testing import helpers + + +class BlockData: + def __init__(self, payload): + self.payload = payload + # generate a unique id + self._asdf_key = asdf.util.BlockKey() + + +class BlockConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + types = [BlockData] + _return_invalid_keys = False + _double_assign_block = False + + def to_yaml_tree(self, obj, tag, ctx): + # lookup source for obj + block_index = ctx.find_block_index( + obj._asdf_key, + lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), + ) + return { + "block_index": block_index, + } + + def from_yaml_tree(self, node, tag, ctx): + block_index = node["block_index"] + data = ctx.get_block_data_callback(block_index)() + obj = BlockData(data.tobytes()) + ctx.assign_block_key(block_index, obj._asdf_key) + if self._double_assign_block: + self._double_assign_block = False + key2 = asdf.util.BlockKey() + ctx.assign_block_key(block_index, key2) + return obj + + def reserve_blocks(self, obj, tag): + if self._return_invalid_keys: + # return something unhashable + self._return_invalid_keys = False + return [[]] + return [obj._asdf_key] + + +class BlockExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + converters = [BlockConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data-1.0.0" + + +@contextlib.contextmanager +def with_extension(ext_class): + with asdf.config_context() as cfg: + cfg.add_extension(ext_class()) + yield + + +@with_extension(BlockExtension) +def test_roundtrip_block_data(): + a = BlockData(b"abcdefg") + b = helpers.roundtrip_object(a) + assert a.payload == b.payload + + +@with_extension(BlockExtension) +def test_block_converter_block_allocation(tmp_path): + a = BlockData(b"abcdefg") + + # make a tree without the BlockData instance to avoid + # the initial validate which will trigger block allocation + af = asdf.AsdfFile({"a": None}) + # now assign to the tree item (avoiding validation) + af["a"] = a + # the AsdfFile instance should have no blocks + assert len(af._blocks._internal_blocks) == 0 + # validate will make a block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + assert np.all(af._blocks._internal_blocks[0].data.tobytes() == a.payload) + # a second validate shouldn't result in more blocks + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # write_to will create blocks here because + # they currently hold storage settings + fn = tmp_path / "test.asdf" + af.write_to(fn) + assert len(af._blocks._internal_blocks) == 1 + + # if we read a file + with asdf.open(fn, mode="rw") as af: + fn2 = tmp_path / "test2.asdf" + # there should be 1 block + assert len(af._blocks._internal_blocks) == 1 + # validate should use that block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # as should write_to + af.write_to(fn2) + assert len(af._blocks._internal_blocks) == 1 + # and update + af.update() + assert len(af._blocks._internal_blocks) == 1 + + +@with_extension(BlockExtension) +def test_invalid_reserve_block_keys(tmp_path): + a = BlockData(b"abcdefg") + af = asdf.AsdfFile({"a": a}) + fn = tmp_path / "test.asdf" + BlockExtension.converters[0]._return_invalid_keys = True + with pytest.raises(TypeError, match="unhashable type: .*"): + af.write_to(fn) + + +@with_extension(BlockExtension) +def test_double_assign_block(tmp_path): + a = BlockData(b"abcdefg") + af = asdf.AsdfFile({"a": a}) + fn = tmp_path / "test.asdf" + af.write_to(fn) + BlockExtension.converters[0]._double_assign_block = True + with pytest.raises(ValueError, match="block 0 is already assigned to a key"): + with asdf.open(fn): + pass + + +class BlockDataCallback: + """An example object that uses the data callback to read block data""" + + def __init__(self, callback): + self.callback = callback + self._asdf_key = asdf.util.BlockKey() + + @property + def data(self): + return self.callback() + + +class BlockDataCallbackConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data_callback-1.0.0"] + types = [BlockDataCallback] + + def to_yaml_tree(self, obj, tag, ctx): + block_index = ctx.find_block_index(obj._asdf_key, obj.callback) + return { + "block_index": block_index, + } + + def from_yaml_tree(self, node, tag, ctx): + block_index = node["block_index"] + + obj = BlockDataCallback(ctx.get_block_data_callback(block_index)) + ctx.assign_block_key(block_index, obj._asdf_key) + return obj + + def reserve_blocks(self, obj, tag): + return [obj._asdf_key] + + +class BlockDataCallbackExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data_callback-1.0.0"] + converters = [BlockDataCallbackConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data_callback-1.0.0" + + +@with_extension(BlockDataCallbackExtension) +def test_block_data_callback_converter(tmp_path): + # use a callback that every time generates a new array + # this would cause issues for the old block management as the + # id(arr) would change every time + a = BlockDataCallback(lambda: np.zeros(3, dtype="uint8")) + + b = helpers.roundtrip_object(a) + assert_array_equal(a.data, b.data) + + # make a tree without the BlockData instance to avoid + # the initial validate which will trigger block allocation + af = asdf.AsdfFile({"a": None}) + # now assign to the tree item (avoiding validation) + af["a"] = a + # the AsdfFile instance should have no blocks + assert len(af._blocks._internal_blocks) == 0 + # validate will make a block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + assert np.all(af._blocks._internal_blocks[0].data == a.data) + # a second validate shouldn't result in more blocks + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # write_to will use the block + fn1 = tmp_path / "test.asdf" + af.write_to(fn1) + assert len(af._blocks._internal_blocks) == 1 + + # if we read a file + with asdf.open(fn1, mode="rw") as af: + fn2 = tmp_path / "test2.asdf" + # there should be 1 block + assert len(af._blocks._internal_blocks) == 1 + # validate should use that block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # as should write_to + af.write_to(fn2) + assert len(af._blocks._internal_blocks) == 1 + # and update + af.update() + assert len(af._blocks._internal_blocks) == 1 + + # check that data was preserved + for fn in (fn1, fn2): + with asdf.open(fn) as af: + assert_array_equal(af["a"].data, a.data) + + +@with_extension(BlockDataCallbackExtension) +def test_block_with_callback_removal(tmp_path): + fn1 = tmp_path / "test1.asdf" + fn2 = tmp_path / "test2.asdf" + + a = BlockDataCallback(lambda: np.zeros(3, dtype="uint8")) + b = BlockDataCallback(lambda: np.ones(3, dtype="uint8")) + base_af = asdf.AsdfFile({"a": a, "b": b}) + base_af.write_to(fn1) + + for remove_key, check_key in [("a", "b"), ("b", "a")]: + # check that removing one does not interfere with the other + with asdf.open(fn1) as af: + af[remove_key] = None + af.write_to(fn2) + with asdf.open(fn2) as af: + af[check_key] = b.data + # also test update + # first copy fn1 to fn2 + with asdf.open(fn1) as af: + af.write_to(fn2) + with asdf.open(fn2, mode="rw") as af: + af[remove_key] = None + af.update() + af[check_key] = b.data + + +def test_seralization_context_block_access(): + af = asdf.AsdfFile() + sctx = af._create_serialization_context() + + # finding an index for an unknown block should + # create one + key = 42 + arr = np.ones(3, dtype="uint8") + index = sctx.find_block_index(key, lambda: arr) + assert len(af._blocks) == 1 + assert id(arr) == id(sctx.get_block_data_callback(index)()) + # finding the same block should not create a new one + index = sctx.find_block_index(key, lambda: arr) + assert len(af._blocks) == 1 + + new_key = 26 + with pytest.raises(ValueError, match="block 0 is already assigned to a key"): + sctx.assign_block_key(index, new_key) + assert len(af._blocks) == 1 + + arr2 = np.zeros(3, dtype="uint8") + # test that providing a new callback won't overwrite + # the first one + index = sctx.find_block_index(key, lambda: arr2) + assert id(arr2) != id(sctx.get_block_data_callback(index)()) diff --git a/asdf/_tests/test_compression.py b/asdf/_tests/test_compression.py index 2aca89604..9bfac6d32 100644 --- a/asdf/_tests/test_compression.py +++ b/asdf/_tests/test_compression.py @@ -155,9 +155,9 @@ def test_none(tmp_path): with asdf.open(tmpfile1) as afile: assert afile.get_array_compression(afile.tree["science_data"]) is None afile.write_to(tmpfile2, all_array_compression="zlib") - assert afile.get_array_compression(afile.tree["science_data"]) == "zlib" with asdf.open(tmpfile2) as afile: + assert afile.get_array_compression(afile.tree["science_data"]) == "zlib" afile.write_to(tmpfile1, all_array_compression=None) with asdf.open(tmpfile1) as afile: diff --git a/asdf/_tests/test_util.py b/asdf/_tests/test_util.py index 112221390..944631941 100644 --- a/asdf/_tests/test_util.py +++ b/asdf/_tests/test_util.py @@ -1,3 +1,4 @@ +import copy import io import pytest @@ -117,3 +118,17 @@ def test_minversion(): assert util.minversion(yaml, "3.1") assert util.minversion("yaml", "3.1") + + +def test_block_key(): + bk = util.BlockKey() + # make sure block key is hashable and can serve as a dictionary key + hash(bk) + d = {bk: 1} + # a new key should produce a different hash than the first + bk2 = util.BlockKey() + d[bk2] = 2 + assert len(d) == 2 + # check that equality and copying a key works + assert copy.copy(bk) == bk + assert bk != hash(bk) diff --git a/asdf/asdf.py b/asdf/asdf.py index e3e1fe2e6..6e7ae6391 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -148,7 +148,8 @@ def __init__( self._closed = False self._external_asdf_by_uri = {} self._blocks = block.BlockManager(self, copy_arrays=copy_arrays, lazy_load=lazy_load) - self._uri = None + # set the uri here so validation can generate any required external blocks + self._uri = uri if tree is None: # Bypassing the tree property here, to avoid validating # an empty tree. @@ -168,8 +169,6 @@ def __init__( else: self.tree = tree self.find_references() - if uri is not None: - self._uri = uri self._comments = [] @@ -1344,7 +1343,64 @@ def write_to( ``asdf.get_config().array_inline_threshold``. """ - + with config_context() as config: + if "all_array_storage" in kwargs: + config.all_array_storage = kwargs["all_array_storage"] + if "all_array_compression" in kwargs: + config.all_array_compression = kwargs["all_array_compression"] + if "compression_kwargs" in kwargs: + config.all_array_compression_kwargs = kwargs["compression_kwargs"] + + used_blocks = self._blocks._find_used_blocks(self.tree, self, remove=False) + + naf = AsdfFile( + {}, + uri=self._uri, + extensions=self.extensions, + version=self.version, + ignore_version_mismatch=self._ignore_version_mismatch, + ignore_unrecognized_tag=self._ignore_unrecognized_tag, + ignore_implicit_conversion=self._ignore_implicit_conversion, + ) + naf._tree = copy.copy(self.tree) # avoid an extra validate + + # deep copy keys that will be modified during write + modified_keys = ["history", "asdf_library"] + for k in modified_keys: + if k in self.tree: + naf._tree[k] = copy.deepcopy(self.tree[k]) + + # copy over block storage and other settings + block_to_key_mapping = {v: k for k, v in self._blocks._key_to_block_mapping.items()} + # this creates blocks in the new block manager that correspond to blocks + # in the original file + for b in self._blocks.blocks: + if b not in used_blocks: + continue + if b in self._blocks._streamed_blocks and b._data is None: + # streamed blocks might not have data + # add a streamed block to naf + blk = naf._blocks.get_streamed_block() + # mark this block as used so it doesn't get removed + blk._used = True + elif b._data is not None or b._fd is not None: # this block has data + arr = b.data + blk = naf._blocks[arr] + blk._used = True + naf.set_array_storage(arr, b.array_storage) + naf.set_array_compression(arr, b.output_compression, **b.output_compression_kwargs) + else: # this block does not have data + key = block_to_key_mapping[b] + blk = naf._blocks.find_or_create_block(key) + blk._used = True + blk._data_callback = b._data_callback + naf._write_to(fd, **kwargs) + + def _write_to( + self, + fd, + **kwargs, + ): pad_blocks = kwargs.pop("pad_blocks", False) include_block_index = kwargs.pop("include_block_index", True) version = kwargs.pop("version", None) @@ -1721,7 +1777,7 @@ def _warn_tag_mismatch(self, tag, best_tag): # This function is called from within yamlutil methods to create # a context when one isn't explicitly passed in. def _create_serialization_context(self): - return SerializationContext(self.version_string, self.extension_manager, self.uri) + return SerializationContext(self.version_string, self.extension_manager, self.uri, self._blocks) def _check_and_set_mode(fileobj, asdf_mode): @@ -1890,10 +1946,11 @@ class SerializationContext: Container for parameters of the current (de)serialization. """ - def __init__(self, version, extension_manager, url): + def __init__(self, version, extension_manager, url, block_manager): self._version = validate_version(version) self._extension_manager = extension_manager self._url = url + self._block_manager = block_manager self.__extensions_used = set() @@ -1954,3 +2011,86 @@ def _extensions_used(self): set of asdf.extension.AsdfExtension or asdf.extension.Extension """ return self.__extensions_used + + def get_block_data_callback(self, index): + """ + Generate a callable that when called will read data + from a block at the provided index + + Parameters + ---------- + index : int + Block index + + Returns + ------- + callback : callable + A callable that when called (with no arguments) returns + the block data as a one dimensional array of uint8 + """ + blk = self._block_manager.get_block(index) + return blk.generate_read_data_callback() + + def assign_block_key(self, block_index, key): + """ + Associate a unique hashable key with a block. + + This is used during Converter.from_yaml_tree and allows + the AsdfFile to be aware of which blocks belong to the + object handled by the converter and allows load_block + to locate the block using the key instead of the index + (which might change if a file undergoes an AsdfFile.update). + + If the block index is later needed (like during to_yaml_tree) + the key can be used with find_block_index to lookup the + block index. + + Parameters + ---------- + + block_index : int + The index of the block to associate with the key + + key : hashable + A unique hashable key to associate with a block + """ + blk = self._block_manager.get_block(block_index) + if self._block_manager._key_to_block_mapping.get(key, blk) is not blk: + msg = f"key {key} is already assigned to a block" + raise ValueError(msg) + if blk in self._block_manager._key_to_block_mapping.values(): + msg = f"block {block_index} is already assigned to a key" + raise ValueError(msg) + self._block_manager._key_to_block_mapping[key] = blk + + def find_block_index(self, lookup_key, data_callback=None): + """ + Find the index of a previously allocated or reserved block. + + This is typically used inside asdf.extension.Converter.to_yaml_tree + + Parameters + ---------- + lookup_key : hashable + Unique key used to retrieve the index of a block that was + previously allocated or reserved. For ndarrays this is + typically the id of the base ndarray. + + data_callback: callable, optional + Callable that when called will return data (ndarray) that will + be written to a block. + At the moment, this is only assigned if a new block + is created to avoid circular references during AsdfFile.update. + + Returns + ------- + block_index: int + Index of the block where data returned from data_callback + will be written. + """ + new_block = lookup_key not in self._block_manager._key_to_block_mapping + blk = self._block_manager.find_or_create_block(lookup_key) + # if we're not creating a block, don't update the data callback + if data_callback is not None and (new_block or (blk._data_callback is None and blk._fd is None)): + blk._data_callback = data_callback + return self._block_manager.get_source(blk) diff --git a/asdf/block.py b/asdf/block.py index dc24a1da5..7f0873fa2 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -37,6 +37,7 @@ def __init__(self, asdffile, copy_arrays=False, lazy_load=True): } self._data_to_block_mapping = {} + self._key_to_block_mapping = {} self._validate_checksums = False self._memmap = not copy_arrays self._lazy_load = lazy_load @@ -81,7 +82,9 @@ def _add(self, block, key=None): if block._data is not None or key is not None: if key is None: key = id(block._data) - self._data_to_block_mapping[key] = block + self._data_to_block_mapping[key] = block + else: + self._key_to_block_mapping[key] = block def remove(self, block): """ @@ -94,6 +97,9 @@ def remove(self, block): for key, blk in list(self._data_to_block_mapping.items()): if blk is block: del self._data_to_block_mapping[key] + for key, blk in list(self._key_to_block_mapping.items()): + if blk is block: + del self._key_to_block_mapping[key] else: msg = f"Unknown array storage type {block.array_storage}" raise ValueError(msg) @@ -366,7 +372,8 @@ def write_external_blocks(self, uri, pad_blocks=False): blk._array_storage = "internal" asdffile._blocks.add(blk) blk._used = True - asdffile.write_to(subfd, pad_blocks=pad_blocks, all_array_storage="internal") + # skip the new block manager here + asdffile._write_to(subfd, pad_blocks=pad_blocks, all_array_storage="internal") def write_block_index(self, fd, ctx): """ @@ -554,20 +561,31 @@ def get_external_uri(self, uri, index): parts[2] = path return patched_urllib_parse.urlunparse(parts) - def _find_used_blocks(self, tree, ctx): + def _find_used_blocks(self, tree, ctx, remove=True): reserved_blocks = set() for node in treeutil.iter_tree(tree): - # check that this object will not be handled by a converter - if not ctx.extension_manager.handles_type(type(node)): + if ctx.extension_manager.handles_type(type(node)): + converter = ctx.extension_manager.get_converter_for_type(type(node)) + sctx = ctx._create_serialization_context() + tag = converter.select_tag(node, sctx) + for key in converter.reserve_blocks(node, tag): + reserved_blocks.add(self.find_or_create_block(key)) + else: hook = ctx._type_index.get_hook_for_type("reserve_blocks", type(node), ctx.version_string) if hook is not None: for block in hook(node, ctx): reserved_blocks.add(block) + if remove: + for block in list(self.blocks): + if not getattr(block, "_used", False) and block not in reserved_blocks: + self.remove(block) + return None for block in list(self.blocks): - if getattr(block, "_used", 0) == 0 and block not in reserved_blocks: - self.remove(block) + if getattr(block, "_used", False): + reserved_blocks.add(block) + return reserved_blocks def _handle_global_block_settings(self, block): cfg = get_config() @@ -606,6 +624,12 @@ def finalize(self, ctx): for block in list(self.blocks): self._handle_global_block_settings(block) + def get_block_by_key(self, key): + if key not in self._key_to_block_mapping: + msg = f"Unknown block key {key}" + raise KeyError(msg) + return self._key_to_block_mapping[key] + def get_block(self, source): """ Given a "source identifier", return a block. @@ -733,11 +757,8 @@ def find_or_create_block_for_array(self, arr): """ from .tags.core import ndarray - if isinstance(arr, ndarray.NDArrayType) and arr.block is not None: - if arr.block in self.blocks: - return arr.block - - arr._block = None + if isinstance(arr, ndarray.NDArrayType) and arr.block is not None and arr.block in self.blocks: + return arr.block base = util.get_array_base(arr) block = self._data_to_block_mapping.get(id(base)) @@ -747,7 +768,6 @@ def find_or_create_block_for_array(self, arr): block = Block(base) self.add(block) self._handle_global_block_settings(block) - return block def find_or_create_block(self, key): @@ -764,7 +784,7 @@ def find_or_create_block(self, key): ------- block : Block """ - block = self._data_to_block_mapping.get(key) + block = self._key_to_block_mapping.get(key) if block is not None: return block @@ -1290,6 +1310,14 @@ def data(self): def close(self): self._data = None + def generate_read_data_callback(self): + """Used in SerializationContext.get_block_data_callback""" + + def callback(): + return self.data + + return callback + class UnloadedBlock: """ diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 5e0bdec32..f3b41bb83 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -150,6 +150,29 @@ def from_yaml_tree(self, node, tag, ctx): or a generator that yields such an instance. """ + def reserve_blocks(self, obj, tag): + """ + Reserve any number of blocks in which data (ndarrays) can be + stored. + + Parameters + ---------- + obj : object + Instance of a custom type to be serialized. Guaranteed to + be an instance of one of the types listed in the `types` + property. + tag : str + The tag identifying the YAML type that ``obj`` should be + converted into. Selected by a call to this converter's + select_tag method. + + Returns + ------- + keys : list of unique hashable keys + These keys will be used to reserve blocks for later use + """ + return [] + class ConverterProxy(Converter): """ @@ -278,6 +301,31 @@ def from_yaml_tree(self, node, tag, ctx): """ return self._delegate.from_yaml_tree(node, tag, ctx) + def reserve_blocks(self, obj, tag): + """ + Reserve blocks to be used during conversion of this object + + Parameters + ---------- + obj : object + Instance of a custom type to be serialized. Guaranteed to + be an instance of one of the types listed in the `types` + property. + tag : str + The tag identifying the YAML type that ``obj`` should be + converted into. Selected by a call to this converter's + select_tag method. + + Returns + ------- + keys : list of unique hashable keys + These keys will be used to reserve blocks for later use + + """ + if hasattr(self._delegate, "reserve_blocks"): + return self._delegate.reserve_blocks(obj, tag) + return [] + @property def delegate(self): """ diff --git a/asdf/tags/core/integer.py b/asdf/tags/core/integer.py index a355d3679..022ff3534 100644 --- a/asdf/tags/core/integer.py +++ b/asdf/tags/core/integer.py @@ -58,14 +58,11 @@ def __init__(self, value, storage_type="internal"): @classmethod def to_tree(cls, node, ctx): - if ctx not in cls._value_cache: - cls._value_cache[ctx] = {} - abs_value = int(np.abs(node._value)) # If the same value has already been stored, reuse the array - if abs_value in cls._value_cache[ctx]: - array = cls._value_cache[ctx][abs_value] + if abs_value in cls._value_cache: + array = cls._value_cache[abs_value] else: # pack integer value into 32-bit words words = [] @@ -76,7 +73,7 @@ def to_tree(cls, node, ctx): array = np.array(words, dtype=np.uint32) if node._storage == "internal": - cls._value_cache[ctx][abs_value] = array + cls._value_cache[abs_value] = array tree = {} ctx.set_array_storage(array, node._storage) diff --git a/asdf/util.py b/asdf/util.py index 285b59a80..d812d1c0f 100644 --- a/asdf/util.py +++ b/asdf/util.py @@ -529,3 +529,25 @@ class FileType(enum.Enum): ASDF = 1 FITS = 2 UNKNOWN = 3 + + +class BlockKey: + """ + Helper class that generates a unique hashable value for every instance + useful for associates blocks and objects during serialization and + deserialization + """ + + _next = 0 + + def __init__(self): + self._key = BlockKey._next + BlockKey._next += 1 + + def __hash__(self): + return self._key + + def __eq__(self, other): + if not isinstance(other, BlockKey): + return NotImplemented + return self._key == other._key diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 2535dfa62..f2f952fa6 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -280,6 +280,107 @@ With this modification we can successfully deserialize our ASDF file: assert reconstituted_f1.inverse.inverse is reconstituted_f1 +.. _extending_converter_block_storage: + +Block storage +============= + +As described above :ref:`extending_converters` can return complex objects that will +be passed to other Converters. If a Converter returns a ndarray, ASDF will recognize this +array and store it in an ASDF block. This is the easiest and preferred means of +storing data in ASDF blocks. + +For applications that require more flexibility, +Converters can control block storage through use of the ``SerializationContext`` +provided as an argument to `Converter.to_yaml_tree` `Converter.from_yaml_tree` and `Converter.select_tag`. + +It is helpful to first review some details of how ASDF +:ref:`stores block `. Blocks are stored sequentially within a +ASDF file following the YAML tree. During reads and writes, ASDF will need to know +the index of the block a Converter would like to use to read or write the correct +block. However, the index used for reading might not be the same index for writing +if the tree was modified or the file is being written to a new location. To allow +ASDF to track the relationship between blocks and objects, Converters will need +to generate unique hashable keys for each block used and associate these keys with +block indices during read and write (more on this below). + +.. note:: + Use of ``id(obj)`` will not generate a unique key as it returns the memory address + which might be reused after the object is garbage collected. + +A simple example of a Converter using block storage to store the ``payload`` for +``BlockData`` object instances is as follows: + +.. runcode:: + + import asdf + import numpy as np + from asdf.extension import Converter, Extension + + class BlockData: + def __init__(self, payload): + self.payload = payload + self._asdf_key = asdf.util.BlockKey() + + + class BlockConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + types = [BlockData] + + def to_yaml_tree(self, obj, tag, ctx): + block_index = ctx.find_block_index( + obj._asdf_key, + lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), + ) + return {"block_index": block_index} + + def from_yaml_tree(self, node, tag, ctx): + block_index = node["block_index"] + obj = BlockData(b"") + ctx.assign_block_key(block_index, obj._asdf_key) + obj.payload = ctx.get_block_data_callback(block_index)() + return obj + + def reserve_blocks(self, obj, tag): + return [obj._asdf_key] + + class BlockExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + converters = [BlockConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data-1.0.0" + + with asdf.config_context() as cfg: + cfg.add_extension(BlockExtension()) + ff = asdf.AsdfFile({"example": BlockData(b"abcdefg")}) + ff.write_to("block_converter_example.asdf") + +.. asdf:: block_converter_example.asdf + +During read, ``Converter.from_yaml_tree`` will be called. Within this method +the Converter should associate any used blocks with unique hashable keys by calling +``SerializationContext.assign_block_key`` and can generate (and use) a callable +function that will return block data using ``SerializationContext.get_block_data_callback``. +A callback for reading the data is provided to support lazy loading without +keeping a reference to the ``SerializationContext`` (which is meant to be +a short lived and lightweight object). + +During write, ``Converter.to_yaml_tree`` will be called. The Converter should +use ``SerializationContext.find_block_index`` to find the location of an +available block by providing a hashable key unique to this object (this should +be the same key used during reading to allow ASDF to associate blocks and objects +during in-place updates). The second argument to ``SerializationContext.find_block_index`` +must be a callable function (returning a ndarray) that ASDF will call when it +is time to write data to the portion of the file corresponding to this block. +Note that it's possible this callback will be called multiple times during a +write and ASDF will not cache the result. If the data is coming from a non-repeatable +source (such as a non-seekable stream of bytes) the data should be cached prior +to providing it to ASDF to allow ASDF to call the callback multiple times. + +A Converter that uses block storage must also define ``Converter.reserve_blocks``. +``Converter.reserve_blocks`` will be called during memory management to free +resources for unused blocks. ``Converter.reserve_blocks`` must +return a list of keys associated with an object. + .. _extending_converters_performance: Entry point performance considerations