From a206fff32b22760eab327f273331d6832bf8e313 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 20 Jan 2023 16:53:05 -0500 Subject: [PATCH 01/34] block storage support for converter add key argument to block_manager.add add data_callback to block add block support to converter Expand support for non-ndarray binary objects reorder args to find_or_create_block and fix usage move all_array_storage/compression into config remove use of get_block_lookup_keys get_block_lookup_keys was used in set_block_storage which created a chickegg for extensions that may or may not use blocks depending on the storage setting. The storage settings were moved to the block manager to start to work around this issue. make data_callback optional for SerializationContext methods add set_block_storage/compression to SerializationContext --- asdf/_tests/test_asdf.py | 4 +- asdf/asdf.py | 238 ++++++++++++++++++++++++++++- asdf/block.py | 17 ++- asdf/extension/_converter.py | 70 +++++++++ asdf/tests/test_block_converter.py | 104 +++++++++++++ 5 files changed, 426 insertions(+), 7 deletions(-) create mode 100644 asdf/tests/test_block_converter.py diff --git a/asdf/_tests/test_asdf.py b/asdf/_tests/test_asdf.py index 1501f3a21..e45b621ca 100644 --- a/asdf/_tests/test_asdf.py +++ b/asdf/_tests/test_asdf.py @@ -196,7 +196,7 @@ def test_open_asdf_extensions(tmp_path): def test_serialization_context(): extension_manager = ExtensionManager([]) - context = SerializationContext("1.4.0", extension_manager, "file://test.asdf") + context = SerializationContext("1.4.0", extension_manager, "file://test.asdf", None) assert context.version == "1.4.0" assert context.extension_manager is extension_manager assert context._extensions_used == set() @@ -215,7 +215,7 @@ def test_serialization_context(): context._mark_extension_used(object()) with pytest.raises(ValueError, match=r"ASDF Standard version .* is not supported by asdf==.*"): - SerializationContext("0.5.4", extension_manager, None) + SerializationContext("0.5.4", extension_manager, None, None) def test_reading_extension_metadata(): diff --git a/asdf/asdf.py b/asdf/asdf.py index e3e1fe2e6..eed791e8f 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -748,6 +748,54 @@ def get_array_storage(self, arr): """ return self._blocks[arr].array_storage + def set_block_storage(self, obj, storage): + """ + Set the block storage type to use for the given binary object's data. + + Parameters + ---------- + obj : object + ASDF-serializable object. + + storage : str + Must be one of: + + - ``internal``: The default. The data will be + stored in a binary block in the same ASDF file. + + - ``external``: Store the data in a binary block in a + separate ASDF file. + + - ``inline``: Store the data as YAML inline in the tree. This option + may not be implemented for all object types. + """ + self._blocks._storage_settings[id(obj)] = storage + # # TODO this doesn't have a serialization context because one + # # hasn't been created. It might be cleaner to move the actual block + # # creation to somewhere the serialization is available perhaps by + # # saving the storage setting in the serialization context + # if len(keys) == 0: + + # for key in keys: + + def get_block_storage(self, obj): + """ + Get the block storage type for the given binary object's data. + + Parameters + ---------- + obj : object + ASDF-serializable object. + + Returns + ------- + str or None + """ + return self._blocks._storage_settings.get(id(obj), None) + # if len(keys) == 0: + + # # TODO remove assumption that all keys have same storage + def set_array_compression(self, arr, compression, **compression_kwargs): """ Set the compression to use for the given array data. @@ -795,6 +843,54 @@ def get_array_compression_kwargs(self, arr): """ """ return self._blocks[arr].output_compression_kwargs + def set_block_compression(self, obj, compression, **compression_kwargs): + """ + Set the compression to use for the given object's block data. + + Parameters + ---------- + obj : object + ASDF-serializable object. + + compression : str or None + Must be one of: + + - ``''`` or `None`: no compression + + - ``zlib``: Use zlib compression + + - ``bzp2``: Use bzip2 compression + + - ``lz4``: Use lz4 compression + + - ``input``: Use the same compression as in the file read. + If there is no prior file, acts as None. + + **kwargs + Additional compressor-specific arguments. + + """ + self._blocks._compression_settings[id(obj)] = (compression, compression_kwargs) + # if len(keys) == 0: + + # for key in keys: + + def get_block_compression(self, obj): + """ + Get the compression type and arguments for the given object's block data. + + Parameters + ---------- + obj : object + ASDF-serializable object. + + Returns + ------- + (str, dict) + """ + return self._blocks._compression_settings.get(id(obj), (None, {})) + # if len(keys) == 0: + @classmethod def _parse_header_line(cls, line): """ @@ -1721,7 +1817,7 @@ def _warn_tag_mismatch(self, tag, best_tag): # This function is called from within yamlutil methods to create # a context when one isn't explicitly passed in. def _create_serialization_context(self): - return SerializationContext(self.version_string, self.extension_manager, self.uri) + return SerializationContext(self.version_string, self.extension_manager, self.uri, self._blocks) def _check_and_set_mode(fileobj, asdf_mode): @@ -1890,10 +1986,11 @@ class SerializationContext: Container for parameters of the current (de)serialization. """ - def __init__(self, version, extension_manager, url): + def __init__(self, version, extension_manager, url, block_manager): self._version = validate_version(version) self._extension_manager = extension_manager self._url = url + self._block_manager = block_manager self.__extensions_used = set() @@ -1954,3 +2051,140 @@ def _extensions_used(self): set of asdf.extension.AsdfExtension or asdf.extension.Extension """ return self.__extensions_used + + def load_block(self, block_index): + """ + Parameters + ---------- + block_index : int + + Returns + ------- + block_data : ndarray + """ + return self._block_manager.get_block(block_index).data + + def _find_block(self, lookup_key, data_callback=None): + blk = self._block_manager.find_or_create_block(lookup_key) + if data_callback is not None: + blk._data_callback = data_callback + return blk + + def reserve_block(self, lookup_key, data_callback): + """ + Reserve a block that will at some point in the future contain + data returned when the data_callback is called. This is typically + used inside asdf.extension.Converter.reserve_blocks + + Parameters + ---------- + lookup_key : hashable + Unique key used to reserve and later retrieve the index + of a block. For ndarrays this is typically the id of the base + ndarray. + + data_callback: callable + Callable that when called will return data (ndarray) that will + be written to a block + + Returns + ------- + block : asdf.block.Block + The block which has been reserved for use. + """ + # as this is typically called in Converter.reserve_blocks, so + # prior to serializing the tree (prior to Converter.to_yaml_tree) + # note that this will not be called prior to AsdfFile.validate or + # AsdfFile.fill/remove_defaults + + # find a block (if there is one). If not, add a block + return self._find_block(lookup_key, data_callback) + + def identify_block(self, source, lookup_key): + """ + Associate a block with a lookup key. This is used to associate binary + blocks with their Python objects so that the same blocks can be reused + when the ASDF file is updated. + + Parameters + ---------- + source : str or int + Block source. + + lookup_key : hashable + Unique key used to retrieve a block. For ndarrays this is + typically the id of the base ndarray. + """ + self._block_manager.identify_block(source, lookup_key) + + def find_block_index(self, lookup_key, data_callback=None): + """ + Find the index of a previously allocated or reserved block. + This is typically used inside asdf.extension.Converter.to_yaml_tree + + Parameters + ---------- + lookup_key : hashable + Unique key used to retrieve the index of a block that was + previously allocated or reserved. For ndarrays this is + typically the id of the base ndarray. + + data_callback: callable, optional + Callable that when called will return data (ndarray) that will + be written to a block. + + Returns + ------- + block_index: int + Index of the block where data returned from data_callback + will be written. + """ + # see notes in test_block_converter + # + # we want the index of a block but we can't write the block and shouldn't + # access the data. + # + # this could be called during write_to/update OR fill_defaults OR validate + # + # for write_to/update we need a valid index that at a later point will be used + # to write the data. + # + # if this is called from fill_defaults we need to generate an index that will point + # back to the correct block that was used when the file was read. I'm not sure how we + # would do this. Perhaps temporarily resolve the data callback to get id(data) and + # use block_manager._data_to_block_mapping to look up the block? + # + # on validate we may or may not have blocks, if we don't have a block, make one? + # + # for now, assuming we're writing a fresh file with write_to + blk = self._find_block(lookup_key, data_callback) + return self._block_manager.get_source(blk) + + def get_block_storage_settings(self, lookup_key): + """ + TODO + TODO add corresponding set + """ + return self._block_manager._storage_settings.get(lookup_key, None) + + def get_block_compression_settings(self, lookup_key): + """ + TODO + TODO add corresponding set + """ + return self._block_manager._compression_settings.get(lookup_key, None) + + def set_block_storage_settings(self, lookup_key, storage): + """ + TODO + TODO add corresponding set + """ + self._block_manager._storage_settings[lookup_key] = storage + + def set_block_compression_settings(self, lookup_key, compression, compression_kwargs=None): + """ + TODO + TODO add corresponding set + """ + compression_kwargs = compression_kwargs or {} + self._block_manager._compression_settings[lookup_key] = (compression, compression_kwargs) diff --git a/asdf/block.py b/asdf/block.py index dc24a1da5..0fd2b410f 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -42,6 +42,9 @@ def __init__(self, asdffile, copy_arrays=False, lazy_load=True): self._lazy_load = lazy_load self._internal_blocks_mapped = False + self._storage_settings = {} + self._compression_settings = {} + def __len__(self): """ Return the total number of blocks being managed. @@ -558,8 +561,13 @@ def _find_used_blocks(self, tree, ctx): reserved_blocks = set() for node in treeutil.iter_tree(tree): - # check that this object will not be handled by a converter - if not ctx.extension_manager.handles_type(type(node)): + if ctx.extension_manager.handles_type(type(node)): + converter = ctx.extension_manager.get_converter_for_type(type(node)) + tag = converter.select_tag(node, ctx) + sctx = ctx._create_serialization_context() + for blk in converter.reserve_blocks(node, tag, sctx): + reserved_blocks.add(blk) + else: hook = ctx._type_index.get_hook_for_type("reserve_blocks", type(node), ctx.version_string) if hook is not None: for block in hook(node, ctx): @@ -747,7 +755,6 @@ def find_or_create_block_for_array(self, arr): block = Block(base) self.add(block) self._handle_global_block_settings(block) - return block def find_or_create_block(self, key): @@ -774,6 +781,10 @@ def find_or_create_block(self, key): return block + def identify_block(self, source, key): + block = self.get_block(source) + self._data_to_block_mapping[key] = block + def get_streamed_block(self): """ Get the streamed block, which is always the last one. A diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 5e0bdec32..f12e4848d 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -150,6 +150,60 @@ def from_yaml_tree(self, node, tag, ctx): or a generator that yields such an instance. """ + def reserve_blocks(self, obj, tag, ctx): + """ + Reserve any number of blocks in which data (ndarrays) can be + stored. + + For each block that will be used for this obj, first call + ctx.reserve_block(lookup_key, data_callback) with a hashable + unique lookup_key (for an ndarray, use the id of the base array) + and a data_callback that when called will return a ndarray + to write to the block. This function will return a asdf.block.Block + that should be included in the list returned by this method. + The index of this block can later (in to_yaml_tree) be retrieved + using ctx.find_block_index. + + Parameters + ---------- + obj : object + Instance of a custom type to be serialized. Guaranteed to + be an instance of one of the types listed in the `types` + property. + tag : str + The tag identifying the YAML type that ``obj`` should be + converted into. Selected by a call to this converter's + select_tag method. + ctx : asdf.asdf.SerializationContext + The context of the current serialization request. + + Returns + ------ + blocks : list of asdf.block.Block + The blocks that were reserved for obj + """ + return [] + + def get_block_lookup_keys(self, obj, storage=None): + """ + Fetch the list of block lookup keys for this object. For + objects without a binary block, this should return an + empty list. + + Parameters + ---------- + obj : object + Instance of a custom type + + storage : str + Block storage type to use for this object or None + + Returns + ------- + list of hashable + """ + return [] + class ConverterProxy(Converter): """ @@ -278,6 +332,22 @@ def from_yaml_tree(self, node, tag, ctx): """ return self._delegate.from_yaml_tree(node, tag, ctx) + def reserve_blocks(self, obj, tag, ctx): + """ + TODO + """ + if hasattr(self._delegate, "reserve_blocks"): + return self._delegate.reserve_blocks(obj, tag, ctx) + return [] + + def get_block_lookup_keys(self, obj, storage=None): + """ + TODO + """ + if hasattr(self._delegate, "get_block_lookup_keys"): + return self._delegate.get_block_lookup_keys(obj, storage) + return [] + @property def delegate(self): """ diff --git a/asdf/tests/test_block_converter.py b/asdf/tests/test_block_converter.py new file mode 100644 index 000000000..f5a887b58 --- /dev/null +++ b/asdf/tests/test_block_converter.py @@ -0,0 +1,104 @@ +import numpy as np + +import asdf +from asdf.extension import Converter, Extension + + +class BlockData: + def __init__(self, payload): + self.payload = payload + + +class BlockConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + types = [BlockData] + + def to_yaml_tree(self, obj, tag, ctx): + # this is called during validate and write_to (and fill_defaults) + # + # During validate (and other non-writing times) ctx should have a + # valid way to find blocks for the object. During (and after) reading + # the block will correspond to the read block. + # + # During write_to, ctx should return the block_index for the block + # that was allocate during reserve_blocks. + # + # One uncovered case is when an item that uses blocks is added + # to the tree and validate is called prior to a write. In this case reserve_blocks + # was never called and no block was read (the object is in memory). + # The old code would allocate a block and validate the tree and then + # throw away to block (if a subsequent write wasn't performed). + # If the ctx is aware that this is not a read or a write, it should + # be valid to return any number (perhaps an unused index) as the + # return for this is never written anywhere. An alternative would + # be that the ctx sees no block exists, then calls reserve_blocks on this + # obj to allow it to claim a block. This requires that ctx be aware + # of which obj is passed to to_yaml_tree and which converter is currently + # being used. + + # lookup source for obj + block_index = ctx.find_block_index( + id(obj), + lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), + ) + return { + "block_index": block_index, + } + + def from_yaml_tree(self, node, tag, ctx): + # this is called during open and fill_defaults (also called during open). + # In all cases, blocks have already been read (or are ready to read) from + # the file. + # So I don't see a need for from_blocks... Adding it would make the API + # more symmetrical but would require another tree traversal and would + # require that objects can be made (or preparations to make those objects) + # without whatever is in the block. This function can return anything + # so a special return value would be complicated. One option would + # be to add something like: ctx.requires_block(id_required, extra) + # that could be added to a list of future resolvable objects. After all + # objects have been passed through from_yaml_tree, the ctx could + # then go through and resolve all of the objects. This would require + # some magic where we then need to swap out objects in the tree whereas + # before the return value to this function was used to fill the tree. + # Without a reason to add this complexity (aside from symmetry) I'm + # inclined to leave out 'from_blocks' + # + # One complication here is that some objects (like zarray) will + # not want to load the data right away and instead just have a way + # to load the data when needed (and possibly multiple times). + # It might be better to have ctx provide load_block for times + # when data should be read and zarr can wrap this. + # + # Another complication is memmapping. It should not matter that + # zarr receives a memmap I'm not sure I've fully thought this + # though. + data = ctx.load_block(node["block_index"]) + return BlockData(data.tobytes()) + + def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? + # Reserve a block using a unique key (this will be used in to_yaml_tree + # to find the block index) and a callable that will return the data/bytes + # that will eventually be written to the block. + return [ctx.reserve_block(id(obj), lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload))] + + # def from_blocks(self, obj, tag, ctx): + # # do I even need this? + + +class BlockExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + converters = [BlockConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data-1.0.0" + + +def test_block_converter(tmp_path): + with asdf.config_context() as cfg: + cfg.add_extension(BlockExtension()) + + tree = {"b": BlockData(b"abcdefg")} + af = asdf.AsdfFile(tree) + fn = tmp_path / "test.asdf" + af.write_to(fn) + + with asdf.open(fn) as af: + assert af["b"].payload == tree["b"].payload From 65d7aec51cd834d5517bec06720ab3e1f98af803 Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 22 Mar 2023 12:04:24 -0400 Subject: [PATCH 02/34] cleanup api --- asdf/asdf.py | 27 --------------------------- asdf/block.py | 4 ---- asdf/extension/_converter.py | 28 ---------------------------- asdf/tests/test_block_converter.py | 3 --- 4 files changed, 62 deletions(-) diff --git a/asdf/asdf.py b/asdf/asdf.py index eed791e8f..28b7712cb 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -770,13 +770,6 @@ def set_block_storage(self, obj, storage): may not be implemented for all object types. """ self._blocks._storage_settings[id(obj)] = storage - # # TODO this doesn't have a serialization context because one - # # hasn't been created. It might be cleaner to move the actual block - # # creation to somewhere the serialization is available perhaps by - # # saving the storage setting in the serialization context - # if len(keys) == 0: - - # for key in keys: def get_block_storage(self, obj): """ @@ -792,9 +785,6 @@ def get_block_storage(self, obj): str or None """ return self._blocks._storage_settings.get(id(obj), None) - # if len(keys) == 0: - - # # TODO remove assumption that all keys have same storage def set_array_compression(self, arr, compression, **compression_kwargs): """ @@ -2100,23 +2090,6 @@ def reserve_block(self, lookup_key, data_callback): # find a block (if there is one). If not, add a block return self._find_block(lookup_key, data_callback) - def identify_block(self, source, lookup_key): - """ - Associate a block with a lookup key. This is used to associate binary - blocks with their Python objects so that the same blocks can be reused - when the ASDF file is updated. - - Parameters - ---------- - source : str or int - Block source. - - lookup_key : hashable - Unique key used to retrieve a block. For ndarrays this is - typically the id of the base ndarray. - """ - self._block_manager.identify_block(source, lookup_key) - def find_block_index(self, lookup_key, data_callback=None): """ Find the index of a previously allocated or reserved block. diff --git a/asdf/block.py b/asdf/block.py index 0fd2b410f..df21948fe 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -781,10 +781,6 @@ def find_or_create_block(self, key): return block - def identify_block(self, source, key): - block = self.get_block(source) - self._data_to_block_mapping[key] = block - def get_streamed_block(self): """ Get the streamed block, which is always the last one. A diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index f12e4848d..38cc2a5f3 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -184,26 +184,6 @@ def reserve_blocks(self, obj, tag, ctx): """ return [] - def get_block_lookup_keys(self, obj, storage=None): - """ - Fetch the list of block lookup keys for this object. For - objects without a binary block, this should return an - empty list. - - Parameters - ---------- - obj : object - Instance of a custom type - - storage : str - Block storage type to use for this object or None - - Returns - ------- - list of hashable - """ - return [] - class ConverterProxy(Converter): """ @@ -340,14 +320,6 @@ def reserve_blocks(self, obj, tag, ctx): return self._delegate.reserve_blocks(obj, tag, ctx) return [] - def get_block_lookup_keys(self, obj, storage=None): - """ - TODO - """ - if hasattr(self._delegate, "get_block_lookup_keys"): - return self._delegate.get_block_lookup_keys(obj, storage) - return [] - @property def delegate(self): """ diff --git a/asdf/tests/test_block_converter.py b/asdf/tests/test_block_converter.py index f5a887b58..5c11243ab 100644 --- a/asdf/tests/test_block_converter.py +++ b/asdf/tests/test_block_converter.py @@ -81,9 +81,6 @@ def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? # that will eventually be written to the block. return [ctx.reserve_block(id(obj), lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload))] - # def from_blocks(self, obj, tag, ctx): - # # do I even need this? - class BlockExtension(Extension): tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] From ab5d19f43cc9f6d664d2a239de66ccd86d1c89b3 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 27 Mar 2023 14:46:30 -0400 Subject: [PATCH 03/34] add claim_block --- asdf/asdf.py | 20 +++- asdf/tests/test_block_converter.py | 180 +++++++++++++++++++++++++++-- 2 files changed, 184 insertions(+), 16 deletions(-) diff --git a/asdf/asdf.py b/asdf/asdf.py index 28b7712cb..46e816a95 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -861,9 +861,6 @@ def set_block_compression(self, obj, compression, **compression_kwargs): """ self._blocks._compression_settings[id(obj)] = (compression, compression_kwargs) - # if len(keys) == 0: - - # for key in keys: def get_block_compression(self, obj): """ @@ -879,7 +876,6 @@ def get_block_compression(self, obj): (str, dict) """ return self._blocks._compression_settings.get(id(obj), (None, {})) - # if len(keys) == 0: @classmethod def _parse_header_line(cls, line): @@ -2042,17 +2038,29 @@ def _extensions_used(self): """ return self.__extensions_used - def load_block(self, block_index): + def load_block(self, block_index, key=None): """ Parameters ---------- block_index : int + key : hashable (optional) + Returns ------- block_data : ndarray """ - return self._block_manager.get_block(block_index).data + data = self._block_manager.get_block(block_index).data + if key is not None: + self.claim_block(block_index, key) + return data + + def claim_block(self, block_index, key): + """ + TODO + """ + blk = self._block_manager.get_block(block_index) + self._block_manager._data_to_block_mapping[key] = blk def _find_block(self, lookup_key, data_callback=None): blk = self._block_manager.find_or_create_block(lookup_key) diff --git a/asdf/tests/test_block_converter.py b/asdf/tests/test_block_converter.py index 5c11243ab..a7033a55a 100644 --- a/asdf/tests/test_block_converter.py +++ b/asdf/tests/test_block_converter.py @@ -1,7 +1,11 @@ +import contextlib + import numpy as np +import pytest import asdf from asdf.extension import Converter, Extension +from asdf.testing import helpers class BlockData: @@ -72,8 +76,29 @@ def from_yaml_tree(self, node, tag, ctx): # Another complication is memmapping. It should not matter that # zarr receives a memmap I'm not sure I've fully thought this # though. - data = ctx.load_block(node["block_index"]) - return BlockData(data.tobytes()) + block_index = node["block_index"] + data = ctx.load_block(block_index) + obj = BlockData(data.tobytes()) + ctx.claim_block(block_index, id(obj)) + + # -- alternatively, if data is not required to make the object -- + # obj = BlockData(b"") + # obj.payload = ctx.load_block(block_index, id(obj)) + + # so I think this might need to 'claim' blocks so subsequent 'validate'/'write_to'/etc use + # the read block (and don't create a new one). This can create a catch 22 when + # the data is used to make the object (like here), since the id(obj) key is not known + # until the object is created. What about something like + # ctx.load_block(index) : loads block data WITHOUT claiming it + # ctx.attach_block(index, key) : load and claim block + # ctx.claim_block(key) : just claim/associate the block + # this is pretty messy and I think could be cleaned up by a new block manager on write + # there, the blocks would always be claimed and the 'read' block manager would be responsible + # for only that... Update might still need an association + # so maybe instead use + # ctx.load_block(index, key=None) and ctx.claim_block(index, key) + # ctx._block_manager._data_to_block_mapping[id(obj)] = ctx._block_manager._internal_blocks[0] + return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? # Reserve a block using a unique key (this will be used in to_yaml_tree @@ -88,14 +113,149 @@ class BlockExtension(Extension): extension_uri = "asdf://somewhere.org/extensions/block_data-1.0.0" -def test_block_converter(tmp_path): +@contextlib.contextmanager +def with_extension(ext_class): with asdf.config_context() as cfg: - cfg.add_extension(BlockExtension()) + cfg.add_extension(ext_class()) + yield + + +@with_extension(BlockExtension) +def test_roundtrip_block_data(): + a = BlockData(b"abcdefg") + b = helpers.roundtrip_object(a) + assert a.payload == b.payload + + +@with_extension(BlockExtension) +def test_block_converter_block_allocation(tmp_path): + a = BlockData(b"abcdefg") + + # make a tree without the BlockData instance to avoid + # the initial validate which will trigger block allocation + af = asdf.AsdfFile({"a": None}) + # now assign to the tree item (avoiding validation) + af["a"] = a + # the AsdfFile instance should have no blocks + assert len(af._blocks._internal_blocks) == 0 + # until validate is called + af.validate() + assert len(af._blocks._internal_blocks) == 1 + assert af._blocks._internal_blocks[0].data.tobytes() == a.payload + # a second validate shouldn't result in more blocks + af.validate() + assert len(af._blocks._internal_blocks) == 1 + fn = tmp_path / "test.asdf" + # nor should write_to + af.write_to(fn) + assert len(af._blocks._internal_blocks) == 1 + + # if we read a file + with asdf.open(fn, mode="rw") as af: + fn2 = tmp_path / "test2.asdf" + # there should be 1 block + assert len(af._blocks._internal_blocks) == 1 + # validate should use that block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # as should write_to + af.write_to(fn2) + assert len(af._blocks._internal_blocks) == 1 + # and update + af.update() + assert len(af._blocks._internal_blocks) == 1 + + +@with_extension(BlockExtension) +def test_invalid_block_data(): + pass + + +class BlockDataCallback: + """An example object that uses the data callback to read block data""" + + def __init__(self, callback): + self.callback = callback + + @property + def data(self): + return self.callback() + + +class BlockDataCallbackConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data_callback-1.0.0"] + types = [BlockDataCallback] + + def to_yaml_tree(self, obj, tag, ctx): + # lookup source for obj + block_index = ctx.find_block_index(id(obj), obj.callback) + return { + "block_index": block_index, + } + + def from_yaml_tree(self, node, tag, ctx): + block_index = node["block_index"] + obj = BlockDataCallback(lambda: ctx.load_block(block_index)) + ctx.claim_block(block_index, id(obj)) + return obj + + def reserve_blocks(self, obj, tag, ctx): + return [ctx.reserve_block(id(obj), obj.callback)] + + +class BlockDataCallbackExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data_callback-1.0.0"] + converters = [BlockDataCallbackConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data_callback-1.0.0" + + +@pytest.mark.xfail(reason="callback use in a converter requires a new block manager on write") +@with_extension(BlockDataCallbackExtension) +def test_block_data_callback_converter(tmp_path): + # use a callback that every time generates a new array + # this would cause issues for the old block management as the + # id(arr) would change every time + a = BlockDataCallback(lambda: np.zeros(3, dtype="uint8")) + + b = helpers.roundtrip_object(a) + assert np.all(a.data == b.data) + + # make a tree without the BlockData instance to avoid + # the initial validate which will trigger block allocation + af = asdf.AsdfFile({"a": None}) + # now assign to the tree item (avoiding validation) + af["a"] = a + # the AsdfFile instance should have no blocks + assert len(af._blocks._internal_blocks) == 0 + # until validate is called + af.validate() + assert len(af._blocks._internal_blocks) == 1 + assert np.all(af._blocks._internal_blocks[0].data == a.data) + # a second validate shouldn't result in more blocks + af.validate() + assert len(af._blocks._internal_blocks) == 1 + fn = tmp_path / "test.asdf" + # nor should write_to + af.write_to(fn) + assert len(af._blocks._internal_blocks) == 1 + + # if we read a file + with asdf.open(fn, mode="rw") as af: + fn2 = tmp_path / "test2.asdf" + # there should be 1 block + assert len(af._blocks._internal_blocks) == 1 + # validate should use that block + af.validate() + assert len(af._blocks._internal_blocks) == 1 + # as should write_to + af.write_to(fn2) # TODO this will NOT work without a new block manager on write + assert len(af._blocks._internal_blocks) == 1 + # and update + af.update() + assert len(af._blocks._internal_blocks) == 1 - tree = {"b": BlockData(b"abcdefg")} - af = asdf.AsdfFile(tree) - fn = tmp_path / "test.asdf" - af.write_to(fn) - with asdf.open(fn) as af: - assert af["b"].payload == tree["b"].payload +# TODO tests to add +# - memmap/lazy_load other open options +# - block storage settings +# - error cases when data is not of the correct type (not an ndarray, an invalid ndarray, etc) From 40017009d3158ca9d910ac54fb0c7469bd986b38 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 3 Feb 2023 15:21:42 -0500 Subject: [PATCH 04/34] don't limit IntegerType _value_cache per ctx _value_cache is used to reuse arrays (and eventually blocks) when multiple IntegerType instances share the same value (and can reuse the same block). A separate cache was used for each ctx (AsdfFile). However, this cache stores ndarrays which can be shared across files as the relationship between arrays and blocks is managed by separate block managers for each file allowing each file to create distinct blocks. This preserves the main use of the cache which is to return an array with the same id for IntegerTypes of identical value to allow the block manager to reuse blocks. --- asdf/tags/core/integer.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/asdf/tags/core/integer.py b/asdf/tags/core/integer.py index a355d3679..022ff3534 100644 --- a/asdf/tags/core/integer.py +++ b/asdf/tags/core/integer.py @@ -58,14 +58,11 @@ def __init__(self, value, storage_type="internal"): @classmethod def to_tree(cls, node, ctx): - if ctx not in cls._value_cache: - cls._value_cache[ctx] = {} - abs_value = int(np.abs(node._value)) # If the same value has already been stored, reuse the array - if abs_value in cls._value_cache[ctx]: - array = cls._value_cache[ctx][abs_value] + if abs_value in cls._value_cache: + array = cls._value_cache[abs_value] else: # pack integer value into 32-bit words words = [] @@ -76,7 +73,7 @@ def to_tree(cls, node, ctx): array = np.array(words, dtype=np.uint32) if node._storage == "internal": - cls._value_cache[ctx][abs_value] = array + cls._value_cache[abs_value] = array tree = {} ctx.set_array_storage(array, node._storage) From 8745ca3653b0b8fd4bda77bbf3a456803ffe56cb Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 3 Feb 2023 15:34:32 -0500 Subject: [PATCH 05/34] make a new AsdfFile on write_to to avoid modifying the AsdfFile on write_to, a new AsdfFile is created on write_to --- asdf/_tests/commands/tests/test_exploded.py | 2 +- asdf/_tests/tags/core/tests/test_integer.py | 2 +- asdf/_tests/test_api.py | 1 + asdf/_tests/test_array_blocks.py | 79 +++++++++++---------- asdf/_tests/test_compression.py | 2 +- asdf/asdf.py | 48 ++++++++++++- asdf/block.py | 5 +- 7 files changed, 97 insertions(+), 42 deletions(-) diff --git a/asdf/_tests/commands/tests/test_exploded.py b/asdf/_tests/commands/tests/test_exploded.py index 89552c5a8..ab2162857 100644 --- a/asdf/_tests/commands/tests/test_exploded.py +++ b/asdf/_tests/commands/tests/test_exploded.py @@ -24,7 +24,7 @@ def test_explode_then_implode(tmpdir): # in internal blocks rather than letting some of them be automatically put # inline. ff.write_to(path, all_array_storage="internal") - assert len(ff._blocks) == 2 + # calling write_to no longer modifies blocks result = main.main_from_args(["explode", path]) diff --git a/asdf/_tests/tags/core/tests/test_integer.py b/asdf/_tests/tags/core/tests/test_integer.py index 9c29fc92d..080b1f710 100644 --- a/asdf/_tests/tags/core/tests/test_integer.py +++ b/asdf/_tests/tags/core/tests/test_integer.py @@ -67,7 +67,7 @@ def test_integer_storage_duplication(tmpdir): with asdf.AsdfFile(tree) as af: af.write_to(tmpfile) - assert len(af._blocks) == 1 + # we can no longer check af.blocks as a new block manager was made with asdf.open(tmpfile, _force_raw_types=True) as rf: assert rf.tree["integer1"]["words"]["source"] == 0 diff --git a/asdf/_tests/test_api.py b/asdf/_tests/test_api.py index aff6359a2..a20b3e59f 100644 --- a/asdf/_tests/test_api.py +++ b/asdf/_tests/test_api.py @@ -439,6 +439,7 @@ def test_array_inline_threshold_masked_array(array_inline_threshold, inline_bloc with asdf.AsdfFile(tree) as af: af.write_to(file_path) + with asdf.open(file_path) as af: assert len(list(af._blocks.inline_blocks)) == inline_blocks assert len(list(af._blocks.internal_blocks)) == internal_blocks diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index 10c634d47..77d7e815d 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -3,6 +3,7 @@ import numpy as np import pytest +import yaml from numpy.random import random from numpy.testing import assert_array_equal @@ -655,7 +656,8 @@ def test_invalid_block_index_values(): assert len(ff._blocks) == 1 -def test_invalid_last_block_index(): +@pytest.mark.parametrize("block_index_index", [0, -1]) +def test_invalid_block_index_offset(block_index_index): """ This adds a value in the block index that points to something that isn't a block @@ -670,13 +672,33 @@ def test_invalid_last_block_index(): tree = {"arrays": arrays} ff = asdf.AsdfFile(tree) - ff.write_to(buff, include_block_index=False) - ff._blocks._internal_blocks[-1]._offset -= 4 - ff._blocks.write_block_index(buff, ff) + ff.write_to(buff) + + # now overwrite the block index with the first entry + # incorrectly pointing to a non-block offset + buff.seek(0) + bs = buff.read() + block_index_header_start = bs.index(constants.INDEX_HEADER) + block_index_start = block_index_header_start + len(constants.INDEX_HEADER) + block_index = yaml.load(bs[block_index_start:], yaml.SafeLoader) + block_index[block_index_index] -= 4 + yaml_version = tuple(int(x) for x in ff.version_map["YAML_VERSION"].split(".")) + buff.seek(block_index_start) + yaml.dump( + block_index, + stream=buff, + explicit_start=True, + explicit_end=True, + version=yaml_version, + allow_unicode=True, + encoding="utf-8", + ) buff.seek(0) with asdf.open(buff) as ff: assert len(ff._blocks) == 1 + for i, a in enumerate(arrays): + assert_array_equal(ff["arrays"][i], a) def test_unordered_block_index(): @@ -702,30 +724,6 @@ def test_unordered_block_index(): assert len(ff._blocks) == 1 -def test_invalid_block_index_first_block_value(): - """ - This creates a bogus block index where the offset of the first - block doesn't match what we already know it to be. In this - case, we should reject the whole block index. - """ - buff = io.BytesIO() - - arrays = [] - for i in range(10): - arrays.append(np.ones((8, 8)) * i) - - tree = {"arrays": arrays} - - ff = asdf.AsdfFile(tree) - ff.write_to(buff, include_block_index=False) - ff._blocks._internal_blocks[0]._offset -= 4 - ff._blocks.write_block_index(buff, ff) - - buff.seek(0) - with asdf.open(buff) as ff: - assert len(ff._blocks) == 1 - - def test_invalid_block_id(): ff = asdf.AsdfFile() with pytest.raises(ValueError, match=r"Invalid source id .*"): @@ -859,7 +857,7 @@ def test_write_to_update_storage_options(tmp_path, all_array_storage, all_array_ if all_array_compression == "bzp2" and compression_kwargs is not None: compression_kwargs = {"compresslevel": 1} - def assert_result(ff, arr): + def assert_result(ff): if all_array_storage == "external": assert "test0000.asdf" in os.listdir(tmp_path) else: @@ -868,13 +866,20 @@ def assert_result(ff, arr): assert len(ff._blocks._internal_blocks) == 1 else: assert len(ff._blocks._internal_blocks) == 0 - blk = ff._blocks[arr] + blk = ff._blocks.find_or_create_block_for_array(ff["array"]) - target_compression = all_array_compression or None - assert blk._output_compression == target_compression + if all_array_storage == "inline": + # for 'inline' storage, no compression will be used + assert blk.input_compression is None + else: + target_compression = all_array_compression or None + if target_compression == "input": + target_compression = None + assert blk.input_compression == target_compression - target_compression_kwargs = compression_kwargs or {} - assert blk._output_compression_kwargs == target_compression_kwargs + # with a new block manager on write, there is no way to check kwargs + # target_compression_kwargs = compression_kwargs or {} + # assert blk._output_compression_kwargs == target_compression_kwargs arr1 = np.ones((8, 8)) tree = {"array": arr1} @@ -888,7 +893,8 @@ def assert_result(ff, arr): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - assert_result(ff1, arr1) + with asdf.open(fn) as raf: + assert_result(raf) # then reuse the file to check update with asdf.open(fn, mode="rw") as ff2: @@ -899,7 +905,8 @@ def assert_result(ff, arr): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - assert_result(ff2, arr2) + with asdf.open(fn) as raf: + assert_result(raf) def test_block_key(): diff --git a/asdf/_tests/test_compression.py b/asdf/_tests/test_compression.py index 2aca89604..9bfac6d32 100644 --- a/asdf/_tests/test_compression.py +++ b/asdf/_tests/test_compression.py @@ -155,9 +155,9 @@ def test_none(tmp_path): with asdf.open(tmpfile1) as afile: assert afile.get_array_compression(afile.tree["science_data"]) is None afile.write_to(tmpfile2, all_array_compression="zlib") - assert afile.get_array_compression(afile.tree["science_data"]) == "zlib" with asdf.open(tmpfile2) as afile: + assert afile.get_array_compression(afile.tree["science_data"]) == "zlib" afile.write_to(tmpfile1, all_array_compression=None) with asdf.open(tmpfile1) as afile: diff --git a/asdf/asdf.py b/asdf/asdf.py index 46e816a95..b14f2de54 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1426,7 +1426,53 @@ def write_to( ``asdf.get_config().array_inline_threshold``. """ - + with config_context() as config: + if "all_array_storage" in kwargs: + config.all_array_storage = kwargs["all_array_storage"] + if "all_array_compression" in kwargs: + config.all_array_compression = kwargs["all_array_compression"] + if "compression_kwargs" in kwargs: + config.all_array_compression_kwargs = kwargs["compression_kwargs"] + + naf = AsdfFile( + {}, + uri=self._uri, + extensions=self.extensions, + version=self.version, + ignore_version_mismatch=self._ignore_version_mismatch, + ignore_unrecognized_tag=self._ignore_unrecognized_tag, + ignore_implicit_conversion=self._ignore_implicit_conversion, + ) + naf._tree = self.tree # avoid an extra validate + # copy over block storage and other settings + naf._blocks._storage_settings = copy.deepcopy(self._blocks._storage_settings) + naf._blocks._compression_settings = copy.deepcopy(self._blocks._compression_settings) + for b in self._blocks.blocks: + # we know we are going to use all new blocks + b._used = True + if b in self._blocks._streamed_blocks and b._data is None: + # streamed blocks might not have data + # add a streamed block to naf + blk = naf._blocks.get_streamed_block() + # mark this block as used so it doesn't get removed + blk._used = True + continue + if b._data_callback is None: + key = b.data + blk = naf._blocks[key] + blk._used = True + naf.set_array_storage(key, b.array_storage) + naf.set_array_compression(key, b.output_compression, **b.output_compression_kwargs) + naf._write_to(fd, **kwargs) + for b in self._blocks.blocks: + if hasattr(b, "_used"): + del b._used + + def _write_to( + self, + fd, + **kwargs, + ): pad_blocks = kwargs.pop("pad_blocks", False) include_block_index = kwargs.pop("include_block_index", True) version = kwargs.pop("version", None) diff --git a/asdf/block.py b/asdf/block.py index df21948fe..21ef249d7 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -83,7 +83,7 @@ def _add(self, block, key=None): if block._data is not None or key is not None: if key is None: - key = id(block._data) + key = id(util.get_array_base(block._data)) self._data_to_block_mapping[key] = block def remove(self, block): @@ -369,7 +369,8 @@ def write_external_blocks(self, uri, pad_blocks=False): blk._array_storage = "internal" asdffile._blocks.add(blk) blk._used = True - asdffile.write_to(subfd, pad_blocks=pad_blocks, all_array_storage="internal") + # skip the new block manager here + asdffile._write_to(subfd, pad_blocks=pad_blocks, all_array_storage="internal") def write_block_index(self, fd, ctx): """ From 8740a74addc37610d948739e650e94e377666f99 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 28 Mar 2023 11:37:37 -0400 Subject: [PATCH 06/34] new AsdfFile on write_to --- asdf/_tests/test_array_blocks.py | 27 +++++++++++ .../{tests => _tests}/test_block_converter.py | 24 ++++++---- asdf/asdf.py | 46 ++++++++++++++----- asdf/block.py | 4 +- 4 files changed, 79 insertions(+), 22 deletions(-) rename asdf/{tests => _tests}/test_block_converter.py (93%) diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index 77d7e815d..c32e7ed1d 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -978,3 +978,30 @@ def __call__(self): mode="r", ) as f: rb.read(f, past_magic=False) + + +def test_remove_blocks(tmp_path): + """Test that writing to a new file""" + fn1 = tmp_path / "test.asdf" + fn2 = tmp_path / "test2.asdf" + + tree = {"a": np.zeros(3), "b": np.ones(1)} + af = asdf.AsdfFile(tree) + af.write_to(fn1) + + with asdf.open(fn1, mode="rw") as af: + assert len(af._blocks._internal_blocks) == 2 + af["a"] = None + # issue: https://github.com/asdf-format/asdf/issues/1505 + # prevents us from calling write_to then update + af.write_to(fn2) + # af.update() + + with asdf.open(fn1, mode="rw") as af: + assert len(af._blocks._internal_blocks) == 2 + af["a"] = None + af.update() + + for fn in (fn1, fn2): + with asdf.open(fn) as af: + assert len(af._blocks._internal_blocks) == 1 diff --git a/asdf/tests/test_block_converter.py b/asdf/_tests/test_block_converter.py similarity index 93% rename from asdf/tests/test_block_converter.py rename to asdf/_tests/test_block_converter.py index a7033a55a..5b3a31f09 100644 --- a/asdf/tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -1,7 +1,7 @@ import contextlib import numpy as np -import pytest +from numpy.testing import assert_array_equal import asdf from asdf.extension import Converter, Extension @@ -104,7 +104,8 @@ def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? # Reserve a block using a unique key (this will be used in to_yaml_tree # to find the block index) and a callable that will return the data/bytes # that will eventually be written to the block. - return [ctx.reserve_block(id(obj), lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload))] + return [id(obj)] + # return [ctx.reserve_block(id(obj), lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload))] class BlockExtension(Extension): @@ -187,6 +188,7 @@ class BlockDataCallbackConverter(Converter): types = [BlockDataCallback] def to_yaml_tree(self, obj, tag, ctx): + # this will be called during validate and might overwrite the callback # lookup source for obj block_index = ctx.find_block_index(id(obj), obj.callback) return { @@ -195,12 +197,17 @@ def to_yaml_tree(self, obj, tag, ctx): def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] - obj = BlockDataCallback(lambda: ctx.load_block(block_index)) + + def callback(_ctx=ctx, _index=block_index): + return _ctx.load_block(_index) + + obj = BlockDataCallback(callback) ctx.claim_block(block_index, id(obj)) return obj def reserve_blocks(self, obj, tag, ctx): - return [ctx.reserve_block(id(obj), obj.callback)] + return [id(obj)] + # return [ctx.reserve_block(id(obj), obj.callback)] class BlockDataCallbackExtension(Extension): @@ -209,7 +216,6 @@ class BlockDataCallbackExtension(Extension): extension_uri = "asdf://somewhere.org/extensions/block_data_callback-1.0.0" -@pytest.mark.xfail(reason="callback use in a converter requires a new block manager on write") @with_extension(BlockDataCallbackExtension) def test_block_data_callback_converter(tmp_path): # use a callback that every time generates a new array @@ -218,7 +224,7 @@ def test_block_data_callback_converter(tmp_path): a = BlockDataCallback(lambda: np.zeros(3, dtype="uint8")) b = helpers.roundtrip_object(a) - assert np.all(a.data == b.data) + assert_array_equal(a.data, b.data) # make a tree without the BlockData instance to avoid # the initial validate which will trigger block allocation @@ -245,15 +251,17 @@ def test_block_data_callback_converter(tmp_path): # there should be 1 block assert len(af._blocks._internal_blocks) == 1 # validate should use that block - af.validate() + af.validate() # FIXME this validate screws up the block data/callback relationship assert len(af._blocks._internal_blocks) == 1 # as should write_to - af.write_to(fn2) # TODO this will NOT work without a new block manager on write + af.write_to(fn2) assert len(af._blocks._internal_blocks) == 1 # and update af.update() assert len(af._blocks._internal_blocks) == 1 + # TODO check that data was preserved + # TODO tests to add # - memmap/lazy_load other open options diff --git a/asdf/asdf.py b/asdf/asdf.py index b14f2de54..b63052856 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1434,6 +1434,8 @@ def write_to( if "compression_kwargs" in kwargs: config.all_array_compression_kwargs = kwargs["compression_kwargs"] + self._blocks.finalize(self) + naf = AsdfFile( {}, uri=self._uri, @@ -1447,26 +1449,44 @@ def write_to( # copy over block storage and other settings naf._blocks._storage_settings = copy.deepcopy(self._blocks._storage_settings) naf._blocks._compression_settings = copy.deepcopy(self._blocks._compression_settings) + block_to_key_mapping = {v: k for k, v in self._blocks._data_to_block_mapping.items()} + # this creates blocks in the new block manager that correspond to blocks + # in the original file for b in self._blocks.blocks: - # we know we are going to use all new blocks - b._used = True if b in self._blocks._streamed_blocks and b._data is None: # streamed blocks might not have data # add a streamed block to naf blk = naf._blocks.get_streamed_block() # mark this block as used so it doesn't get removed blk._used = True - continue - if b._data_callback is None: - key = b.data - blk = naf._blocks[key] + elif b._data is not None or b._fd is not None: # this block has data + arr = b.data + blk = naf._blocks[arr] + blk._used = True + naf.set_array_storage(arr, b.array_storage) + naf.set_array_compression(arr, b.output_compression, **b.output_compression_kwargs) + else: # this block does not have data + key = block_to_key_mapping[b] + blk = naf._blocks.find_or_create_block(key) blk._used = True - naf.set_array_storage(key, b.array_storage) - naf.set_array_compression(key, b.output_compression, **b.output_compression_kwargs) + blk._data_callback = b._data_callback + naf._blocks._storage_settings[key] = self._blocks._storage_settings.get(key, b.array_storage) + naf._blocks._compression_settings[key] = self._blocks._compression_settings.get( + key, + (b.output_compression, b.output_compression_kwargs), + ) + # if b in block_to_key_mapping: # this block is mapped by key, not array data + # key = block_to_key_mapping[b] + # breakpoint() + # else: + # key = b.data + # if b._data_callback is None: + # key = block_to_key_mapping.get(blk, b.data) + # blk = naf._blocks[key] + # blk._used = True + # naf.set_array_storage(key, b.array_storage) + # naf.set_array_compression(key, b.output_compression, **b.output_compression_kwargs) naf._write_to(fd, **kwargs) - for b in self._blocks.blocks: - if hasattr(b, "_used"): - del b._used def _write_to( self, @@ -2109,8 +2129,10 @@ def claim_block(self, block_index, key): self._block_manager._data_to_block_mapping[key] = blk def _find_block(self, lookup_key, data_callback=None): + new_block = lookup_key not in self._block_manager._data_to_block_mapping blk = self._block_manager.find_or_create_block(lookup_key) - if data_callback is not None: + # if we're not creating a block, don't update the data callback + if data_callback is not None and (new_block or (blk._data_callback is None and blk._fd is None)): blk._data_callback = data_callback return blk diff --git a/asdf/block.py b/asdf/block.py index 21ef249d7..0e4b78540 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -566,8 +566,8 @@ def _find_used_blocks(self, tree, ctx): converter = ctx.extension_manager.get_converter_for_type(type(node)) tag = converter.select_tag(node, ctx) sctx = ctx._create_serialization_context() - for blk in converter.reserve_blocks(node, tag, sctx): - reserved_blocks.add(blk) + for key in converter.reserve_blocks(node, tag, sctx): + reserved_blocks.add(self.find_or_create_block(key)) else: hook = ctx._type_index.get_hook_for_type("reserve_blocks", type(node), ctx.version_string) if hook is not None: From 5dd1d48cfd87658e0448f8d13c2462bc261bbf45 Mon Sep 17 00:00:00 2001 From: Brett Date: Tue, 28 Mar 2023 15:17:04 -0400 Subject: [PATCH 07/34] fix array block assignment if write_to called before update fixes issue: https://github.com/asdf-format/asdf/issues/1505 --- asdf/_tests/test_array_blocks.py | 16 ++++++++++++++++ asdf/block.py | 7 +++---- asdf/tags/core/ndarray.py | 2 ++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index c32e7ed1d..78032b4d4 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -1005,3 +1005,19 @@ def test_remove_blocks(tmp_path): for fn in (fn1, fn2): with asdf.open(fn) as af: assert len(af._blocks._internal_blocks) == 1 + + +def test_write_to_before_update(tmp_path): + # this is a regression test for: https://github.com/asdf-format/asdf/issues/1505 + fn1 = tmp_path / "test1.asdf" + fn2 = tmp_path / "test2.asdf" + + tree = {"a": np.zeros(3), "b": np.ones(3)} + af = asdf.AsdfFile(tree) + + af.write_to(fn1) + + with asdf.open(fn1, mode="rw") as af: + af["a"] = None + af.write_to(fn2) + af.update() diff --git a/asdf/block.py b/asdf/block.py index 0e4b78540..cfecc69da 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -742,11 +742,10 @@ def find_or_create_block_for_array(self, arr): """ from .tags.core import ndarray - if isinstance(arr, ndarray.NDArrayType) and arr.block is not None: - if arr.block in self.blocks: - return arr.block + if isinstance(arr, ndarray.NDArrayType) and arr.block is not None and arr.block in self.blocks: + return arr.block - arr._block = None + # arr._block = None base = util.get_array_base(arr) block = self._data_to_block_mapping.get(id(base)) diff --git a/asdf/tags/core/ndarray.py b/asdf/tags/core/ndarray.py index 15a263df5..ea60f0b66 100644 --- a/asdf/tags/core/ndarray.py +++ b/asdf/tags/core/ndarray.py @@ -444,6 +444,8 @@ def to_tree(cls, data, ctx): shape = data.shape + # if getattr(data, '_block', None) is not None: + # breakpoint() block = ctx._blocks.find_or_create_block_for_array(data) # Compute the offset relative to the base array and not the From 38cb99ca131b817ad5f1c8b8bdb19abe391bf42b Mon Sep 17 00:00:00 2001 From: Brett Date: Wed, 29 Mar 2023 09:53:08 -0400 Subject: [PATCH 08/34] update with callback working --- asdf/_tests/test_block_converter.py | 55 +++++++++++++++++++++++------ asdf/asdf.py | 24 ++++++++----- asdf/block.py | 18 ++++++++-- 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 5b3a31f09..11acc4b2d 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -77,7 +77,7 @@ def from_yaml_tree(self, node, tag, ctx): # zarr receives a memmap I'm not sure I've fully thought this # though. block_index = node["block_index"] - data = ctx.load_block(block_index) + data = ctx.load_block(block_index, by_index=True) obj = BlockData(data.tobytes()) ctx.claim_block(block_index, id(obj)) @@ -198,16 +198,21 @@ def to_yaml_tree(self, obj, tag, ctx): def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] - def callback(_ctx=ctx, _index=block_index): - return _ctx.load_block(_index) + obj = BlockDataCallback(lambda: None) + # now that we have an object we use it's memory location + # to generate a key + key = id(obj) - obj = BlockDataCallback(callback) - ctx.claim_block(block_index, id(obj)) + def callback(_ctx=ctx, _key=key): + return _ctx.load_block(_key) + + obj.callback = callback + + ctx.claim_block(block_index, key) return obj def reserve_blocks(self, obj, tag, ctx): return [id(obj)] - # return [ctx.reserve_block(id(obj), obj.callback)] class BlockDataCallbackExtension(Extension): @@ -240,13 +245,13 @@ def test_block_data_callback_converter(tmp_path): # a second validate shouldn't result in more blocks af.validate() assert len(af._blocks._internal_blocks) == 1 - fn = tmp_path / "test.asdf" + fn1 = tmp_path / "test.asdf" # nor should write_to - af.write_to(fn) + af.write_to(fn1) assert len(af._blocks._internal_blocks) == 1 # if we read a file - with asdf.open(fn, mode="rw") as af: + with asdf.open(fn1, mode="rw") as af: fn2 = tmp_path / "test2.asdf" # there should be 1 block assert len(af._blocks._internal_blocks) == 1 @@ -260,7 +265,37 @@ def test_block_data_callback_converter(tmp_path): af.update() assert len(af._blocks._internal_blocks) == 1 - # TODO check that data was preserved + # check that data was preserved + for fn in (fn1, fn2): + with asdf.open(fn) as af: + assert_array_equal(af["a"].data, a.data) + + +@with_extension(BlockDataCallbackExtension) +def test_block_with_callback_removal(tmp_path): + fn1 = tmp_path / "test1.asdf" + fn2 = tmp_path / "test2.asdf" + + a = BlockDataCallback(lambda: np.zeros(3, dtype="uint8")) + b = BlockDataCallback(lambda: np.ones(3, dtype="uint8")) + base_af = asdf.AsdfFile({"a": a, "b": b}) + base_af.write_to(fn1) + + for remove_key, check_key in [("a", "b"), ("b", "a")]: + # check that removing one does not interfere with the other + with asdf.open(fn1) as af: + af[remove_key] = None + af.write_to(fn2) + with asdf.open(fn2) as af: + af[check_key] = b.data + # also test update + # first copy fn1 to fn2 + with asdf.open(fn1) as af: + af.write_to(fn2) + with asdf.open(fn2, mode="rw") as af: + af[remove_key] = None + af.update() + af[check_key] = b.data # TODO tests to add diff --git a/asdf/asdf.py b/asdf/asdf.py index b63052856..6da566a1c 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1434,7 +1434,7 @@ def write_to( if "compression_kwargs" in kwargs: config.all_array_compression_kwargs = kwargs["compression_kwargs"] - self._blocks.finalize(self) + used_blocks = self._blocks._find_used_blocks(self.tree, self, remove=False) naf = AsdfFile( {}, @@ -1453,6 +1453,8 @@ def write_to( # this creates blocks in the new block manager that correspond to blocks # in the original file for b in self._blocks.blocks: + if b not in used_blocks: + continue if b in self._blocks._streamed_blocks and b._data is None: # streamed blocks might not have data # add a streamed block to naf @@ -2104,22 +2106,28 @@ def _extensions_used(self): """ return self.__extensions_used - def load_block(self, block_index, key=None): + def load_block(self, index_or_key, by_index=False): """ Parameters ---------- - block_index : int + index_or_key : int or hashable - key : hashable (optional) + by_index : bool, optional + if True, treat index_or_key as a block index + if False (default), treat index_or_key as a key Returns ------- block_data : ndarray """ - data = self._block_manager.get_block(block_index).data - if key is not None: - self.claim_block(block_index, key) - return data + if by_index: + # index_or_key is a block index + blk = self._block_manager.get_block(index_or_key) + else: + # index_or_key is a block key + blk = self._block_manager.get_block_by_key(index_or_key) + + return blk.data def claim_block(self, block_index, key): """ diff --git a/asdf/block.py b/asdf/block.py index cfecc69da..832c726f4 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -558,7 +558,7 @@ def get_external_uri(self, uri, index): parts[2] = path return patched_urllib_parse.urlunparse(parts) - def _find_used_blocks(self, tree, ctx): + def _find_used_blocks(self, tree, ctx, remove=True): reserved_blocks = set() for node in treeutil.iter_tree(tree): @@ -574,9 +574,15 @@ def _find_used_blocks(self, tree, ctx): for block in hook(node, ctx): reserved_blocks.add(block) + if remove: + for block in list(self.blocks): + if getattr(block, "_used", 0) == 0 and block not in reserved_blocks: + self.remove(block) + return None for block in list(self.blocks): - if getattr(block, "_used", 0) == 0 and block not in reserved_blocks: - self.remove(block) + if getattr(block, "_used", 0): + reserved_blocks.add(block) + return reserved_blocks def _handle_global_block_settings(self, block): cfg = get_config() @@ -615,6 +621,12 @@ def finalize(self, ctx): for block in list(self.blocks): self._handle_global_block_settings(block) + def get_block_by_key(self, key): + if key not in self._data_to_block_mapping: + msg = f"Unknown block key {key}" + raise KeyError(msg) + return self._data_to_block_mapping[key] + def get_block(self, source): """ Given a "source identifier", return a block. From ba9cd5ae2d21aac23c122f11172b57996db247d8 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 09:37:10 -0400 Subject: [PATCH 09/34] cleanup tests for new AsdfFile on write --- asdf/_tests/commands/tests/test_exploded.py | 3 +- asdf/_tests/tags/core/tests/test_integer.py | 2 +- asdf/_tests/test_array_blocks.py | 36 ++++++----- asdf/_tests/test_block_converter.py | 66 --------------------- asdf/asdf.py | 11 ---- 5 files changed, 25 insertions(+), 93 deletions(-) diff --git a/asdf/_tests/commands/tests/test_exploded.py b/asdf/_tests/commands/tests/test_exploded.py index ab2162857..e431fbb2d 100644 --- a/asdf/_tests/commands/tests/test_exploded.py +++ b/asdf/_tests/commands/tests/test_exploded.py @@ -24,7 +24,8 @@ def test_explode_then_implode(tmpdir): # in internal blocks rather than letting some of them be automatically put # inline. ff.write_to(path, all_array_storage="internal") - # calling write_to no longer modifies blocks + with asdf.open(path) as af: + assert len(af._blocks._internal_blocks) == 2 result = main.main_from_args(["explode", path]) diff --git a/asdf/_tests/tags/core/tests/test_integer.py b/asdf/_tests/tags/core/tests/test_integer.py index 080b1f710..3cd690c25 100644 --- a/asdf/_tests/tags/core/tests/test_integer.py +++ b/asdf/_tests/tags/core/tests/test_integer.py @@ -67,9 +67,9 @@ def test_integer_storage_duplication(tmpdir): with asdf.AsdfFile(tree) as af: af.write_to(tmpfile) - # we can no longer check af.blocks as a new block manager was made with asdf.open(tmpfile, _force_raw_types=True) as rf: + assert len(af._blocks) == 1 assert rf.tree["integer1"]["words"]["source"] == 0 assert rf.tree["integer2"]["words"]["source"] == 0 diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index 78032b4d4..6aac6b0e3 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -858,6 +858,10 @@ def test_write_to_update_storage_options(tmp_path, all_array_storage, all_array_ compression_kwargs = {"compresslevel": 1} def assert_result(ff): + if "array" not in ff: + # this was called from _write_to while making an external block + # so don't check the result + return if all_array_storage == "external": assert "test0000.asdf" in os.listdir(tmp_path) else: @@ -868,24 +872,29 @@ def assert_result(ff): assert len(ff._blocks._internal_blocks) == 0 blk = ff._blocks.find_or_create_block_for_array(ff["array"]) - if all_array_storage == "inline": - # for 'inline' storage, no compression will be used - assert blk.input_compression is None - else: - target_compression = all_array_compression or None - if target_compression == "input": - target_compression = None - assert blk.input_compression == target_compression + target_compression = all_array_compression or None + if target_compression == "input": + target_compression = None + assert blk.output_compression == target_compression # with a new block manager on write, there is no way to check kwargs - # target_compression_kwargs = compression_kwargs or {} - # assert blk._output_compression_kwargs == target_compression_kwargs + target_compression_kwargs = compression_kwargs or {} + assert blk._output_compression_kwargs == target_compression_kwargs arr1 = np.ones((8, 8)) tree = {"array": arr1} fn = tmp_path / "test.asdf" ff1 = asdf.AsdfFile(tree) + + original = asdf.AsdfFile._write_to + + def patched(self, *args, **kwargs): + original(self, *args, **kwargs) + assert_result(self) + + asdf.AsdfFile._write_to = patched + # first check write_to ff1.write_to( fn, @@ -893,8 +902,8 @@ def assert_result(ff): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - with asdf.open(fn) as raf: - assert_result(raf) + + asdf.AsdfFile._write_to = original # then reuse the file to check update with asdf.open(fn, mode="rw") as ff2: @@ -905,8 +914,7 @@ def assert_result(ff): all_array_compression=all_array_compression, compression_kwargs=compression_kwargs, ) - with asdf.open(fn) as raf: - assert_result(raf) + assert_result(ff2) def test_block_key(): diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 11acc4b2d..8288f17b4 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -18,28 +18,6 @@ class BlockConverter(Converter): types = [BlockData] def to_yaml_tree(self, obj, tag, ctx): - # this is called during validate and write_to (and fill_defaults) - # - # During validate (and other non-writing times) ctx should have a - # valid way to find blocks for the object. During (and after) reading - # the block will correspond to the read block. - # - # During write_to, ctx should return the block_index for the block - # that was allocate during reserve_blocks. - # - # One uncovered case is when an item that uses blocks is added - # to the tree and validate is called prior to a write. In this case reserve_blocks - # was never called and no block was read (the object is in memory). - # The old code would allocate a block and validate the tree and then - # throw away to block (if a subsequent write wasn't performed). - # If the ctx is aware that this is not a read or a write, it should - # be valid to return any number (perhaps an unused index) as the - # return for this is never written anywhere. An alternative would - # be that the ctx sees no block exists, then calls reserve_blocks on this - # obj to allow it to claim a block. This requires that ctx be aware - # of which obj is passed to to_yaml_tree and which converter is currently - # being used. - # lookup source for obj block_index = ctx.find_block_index( id(obj), @@ -50,32 +28,6 @@ def to_yaml_tree(self, obj, tag, ctx): } def from_yaml_tree(self, node, tag, ctx): - # this is called during open and fill_defaults (also called during open). - # In all cases, blocks have already been read (or are ready to read) from - # the file. - # So I don't see a need for from_blocks... Adding it would make the API - # more symmetrical but would require another tree traversal and would - # require that objects can be made (or preparations to make those objects) - # without whatever is in the block. This function can return anything - # so a special return value would be complicated. One option would - # be to add something like: ctx.requires_block(id_required, extra) - # that could be added to a list of future resolvable objects. After all - # objects have been passed through from_yaml_tree, the ctx could - # then go through and resolve all of the objects. This would require - # some magic where we then need to swap out objects in the tree whereas - # before the return value to this function was used to fill the tree. - # Without a reason to add this complexity (aside from symmetry) I'm - # inclined to leave out 'from_blocks' - # - # One complication here is that some objects (like zarray) will - # not want to load the data right away and instead just have a way - # to load the data when needed (and possibly multiple times). - # It might be better to have ctx provide load_block for times - # when data should be read and zarr can wrap this. - # - # Another complication is memmapping. It should not matter that - # zarr receives a memmap I'm not sure I've fully thought this - # though. block_index = node["block_index"] data = ctx.load_block(block_index, by_index=True) obj = BlockData(data.tobytes()) @@ -84,28 +36,10 @@ def from_yaml_tree(self, node, tag, ctx): # -- alternatively, if data is not required to make the object -- # obj = BlockData(b"") # obj.payload = ctx.load_block(block_index, id(obj)) - - # so I think this might need to 'claim' blocks so subsequent 'validate'/'write_to'/etc use - # the read block (and don't create a new one). This can create a catch 22 when - # the data is used to make the object (like here), since the id(obj) key is not known - # until the object is created. What about something like - # ctx.load_block(index) : loads block data WITHOUT claiming it - # ctx.attach_block(index, key) : load and claim block - # ctx.claim_block(key) : just claim/associate the block - # this is pretty messy and I think could be cleaned up by a new block manager on write - # there, the blocks would always be claimed and the 'read' block manager would be responsible - # for only that... Update might still need an association - # so maybe instead use - # ctx.load_block(index, key=None) and ctx.claim_block(index, key) - # ctx._block_manager._data_to_block_mapping[id(obj)] = ctx._block_manager._internal_blocks[0] return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? - # Reserve a block using a unique key (this will be used in to_yaml_tree - # to find the block index) and a callable that will return the data/bytes - # that will eventually be written to the block. return [id(obj)] - # return [ctx.reserve_block(id(obj), lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload))] class BlockExtension(Extension): diff --git a/asdf/asdf.py b/asdf/asdf.py index 6da566a1c..8837f07b4 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1477,17 +1477,6 @@ def write_to( key, (b.output_compression, b.output_compression_kwargs), ) - # if b in block_to_key_mapping: # this block is mapped by key, not array data - # key = block_to_key_mapping[b] - # breakpoint() - # else: - # key = b.data - # if b._data_callback is None: - # key = block_to_key_mapping.get(blk, b.data) - # blk = naf._blocks[key] - # blk._used = True - # naf.set_array_storage(key, b.array_storage) - # naf.set_array_compression(key, b.output_compression, **b.output_compression_kwargs) naf._write_to(fd, **kwargs) def _write_to( From 538c1120d30186e07f0b136ba7be522e4efdd8e8 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 10:04:59 -0400 Subject: [PATCH 10/34] add docstrings to block converter functions --- asdf/asdf.py | 74 ++++++++++++------------------------ asdf/block.py | 2 - asdf/extension/_converter.py | 34 +++++++++++------ asdf/tags/core/ndarray.py | 2 - 4 files changed, 47 insertions(+), 65 deletions(-) diff --git a/asdf/asdf.py b/asdf/asdf.py index 8837f07b4..4a7c701d3 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -2097,6 +2097,9 @@ def _extensions_used(self): def load_block(self, index_or_key, by_index=False): """ + Return data from a block using either the block index + or block key. + Parameters ---------- index_or_key : int or hashable @@ -2108,6 +2111,7 @@ def load_block(self, index_or_key, by_index=False): Returns ------- block_data : ndarray + The ndarray block data (one dimension, uint8 dtype) """ if by_index: # index_or_key is a block index @@ -2120,7 +2124,26 @@ def load_block(self, index_or_key, by_index=False): def claim_block(self, block_index, key): """ - TODO + Associate a unique hashable key with a block. + + This is used during Converter.from_yaml_tree and allows + the AsdfFile to be aware of which blocks belong to the + object handled by the converter and allows load_block + to locate the block using the key instead of the index + (which might change if a file undergoes an AsdfFile.update). + + If the block index is later needed (like during to_yaml_tree) + the key can be used with find_block_index to lookup the + block index. + + Parameters + ---------- + + block_index : int + The index of the block to associate with the key + + key : hashable + A unique hashable key to associate with a block """ blk = self._block_manager.get_block(block_index) self._block_manager._data_to_block_mapping[key] = blk @@ -2133,39 +2156,10 @@ def _find_block(self, lookup_key, data_callback=None): blk._data_callback = data_callback return blk - def reserve_block(self, lookup_key, data_callback): - """ - Reserve a block that will at some point in the future contain - data returned when the data_callback is called. This is typically - used inside asdf.extension.Converter.reserve_blocks - - Parameters - ---------- - lookup_key : hashable - Unique key used to reserve and later retrieve the index - of a block. For ndarrays this is typically the id of the base - ndarray. - - data_callback: callable - Callable that when called will return data (ndarray) that will - be written to a block - - Returns - ------- - block : asdf.block.Block - The block which has been reserved for use. - """ - # as this is typically called in Converter.reserve_blocks, so - # prior to serializing the tree (prior to Converter.to_yaml_tree) - # note that this will not be called prior to AsdfFile.validate or - # AsdfFile.fill/remove_defaults - - # find a block (if there is one). If not, add a block - return self._find_block(lookup_key, data_callback) - def find_block_index(self, lookup_key, data_callback=None): """ Find the index of a previously allocated or reserved block. + This is typically used inside asdf.extension.Converter.to_yaml_tree Parameters @@ -2185,24 +2179,6 @@ def find_block_index(self, lookup_key, data_callback=None): Index of the block where data returned from data_callback will be written. """ - # see notes in test_block_converter - # - # we want the index of a block but we can't write the block and shouldn't - # access the data. - # - # this could be called during write_to/update OR fill_defaults OR validate - # - # for write_to/update we need a valid index that at a later point will be used - # to write the data. - # - # if this is called from fill_defaults we need to generate an index that will point - # back to the correct block that was used when the file was read. I'm not sure how we - # would do this. Perhaps temporarily resolve the data callback to get id(data) and - # use block_manager._data_to_block_mapping to look up the block? - # - # on validate we may or may not have blocks, if we don't have a block, make one? - # - # for now, assuming we're writing a fresh file with write_to blk = self._find_block(lookup_key, data_callback) return self._block_manager.get_source(blk) diff --git a/asdf/block.py b/asdf/block.py index 832c726f4..d91d76168 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -757,8 +757,6 @@ def find_or_create_block_for_array(self, arr): if isinstance(arr, ndarray.NDArrayType) and arr.block is not None and arr.block in self.blocks: return arr.block - # arr._block = None - base = util.get_array_base(arr) block = self._data_to_block_mapping.get(id(base)) if block is not None: diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 38cc2a5f3..44371b703 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -155,15 +155,6 @@ def reserve_blocks(self, obj, tag, ctx): Reserve any number of blocks in which data (ndarrays) can be stored. - For each block that will be used for this obj, first call - ctx.reserve_block(lookup_key, data_callback) with a hashable - unique lookup_key (for an ndarray, use the id of the base array) - and a data_callback that when called will return a ndarray - to write to the block. This function will return a asdf.block.Block - that should be included in the list returned by this method. - The index of this block can later (in to_yaml_tree) be retrieved - using ctx.find_block_index. - Parameters ---------- obj : object @@ -179,8 +170,8 @@ def reserve_blocks(self, obj, tag, ctx): Returns ------ - blocks : list of asdf.block.Block - The blocks that were reserved for obj + keys : list of unique hashable keys + These keys will be used to reserve blocks for later use """ return [] @@ -314,7 +305,26 @@ def from_yaml_tree(self, node, tag, ctx): def reserve_blocks(self, obj, tag, ctx): """ - TODO + Reserve blocks to be used during conversion of this object + + Parameters + ---------- + obj : object + Instance of a custom type to be serialized. Guaranteed to + be an instance of one of the types listed in the `types` + property. + tag : str + The tag identifying the YAML type that ``obj`` should be + converted into. Selected by a call to this converter's + select_tag method. + ctx : asdf.asdf.SerializationContext + The context of the current serialization request. + + Returns + ------ + keys : list of unique hashable keys + These keys will be used to reserve blocks for later use + """ if hasattr(self._delegate, "reserve_blocks"): return self._delegate.reserve_blocks(obj, tag, ctx) diff --git a/asdf/tags/core/ndarray.py b/asdf/tags/core/ndarray.py index ea60f0b66..15a263df5 100644 --- a/asdf/tags/core/ndarray.py +++ b/asdf/tags/core/ndarray.py @@ -444,8 +444,6 @@ def to_tree(cls, data, ctx): shape = data.shape - # if getattr(data, '_block', None) is not None: - # breakpoint() block = ctx._blocks.find_or_create_block_for_array(data) # Compute the offset relative to the base array and not the From 22d2af31f5c38ba87bf799414d613b8a65d6e7f5 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 13:04:50 -0400 Subject: [PATCH 11/34] more code cleanup --- asdf/_tests/test_array_blocks.py | 7 +++++-- asdf/_tests/test_block_converter.py | 4 ++-- asdf/asdf.py | 2 +- asdf/block.py | 1 + 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index 6aac6b0e3..aee44c228 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -870,14 +870,13 @@ def assert_result(ff): assert len(ff._blocks._internal_blocks) == 1 else: assert len(ff._blocks._internal_blocks) == 0 - blk = ff._blocks.find_or_create_block_for_array(ff["array"]) + blk = ff._blocks[ff["array"]] target_compression = all_array_compression or None if target_compression == "input": target_compression = None assert blk.output_compression == target_compression - # with a new block manager on write, there is no way to check kwargs target_compression_kwargs = compression_kwargs or {} assert blk._output_compression_kwargs == target_compression_kwargs @@ -887,6 +886,10 @@ def assert_result(ff): ff1 = asdf.AsdfFile(tree) + # as a new AsdfFile is used for write_to and we want + # to check blocks here, we patch _write_to to allow us + # to inspect the blocks in the new AsdfFile before + # it falls out of scope original = asdf.AsdfFile._write_to def patched(self, *args, **kwargs): diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 8288f17b4..fbd4eaef8 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -190,7 +190,7 @@ def test_block_data_callback_converter(tmp_path): # there should be 1 block assert len(af._blocks._internal_blocks) == 1 # validate should use that block - af.validate() # FIXME this validate screws up the block data/callback relationship + af.validate() assert len(af._blocks._internal_blocks) == 1 # as should write_to af.write_to(fn2) @@ -234,5 +234,5 @@ def test_block_with_callback_removal(tmp_path): # TODO tests to add # - memmap/lazy_load other open options -# - block storage settings +# - block storage settings: compression, etc # - error cases when data is not of the correct type (not an ndarray, an invalid ndarray, etc) diff --git a/asdf/asdf.py b/asdf/asdf.py index 4a7c701d3..67c7c19fe 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -148,7 +148,7 @@ def __init__( self._closed = False self._external_asdf_by_uri = {} self._blocks = block.BlockManager(self, copy_arrays=copy_arrays, lazy_load=lazy_load) - self._uri = None + self._uri = uri if tree is None: # Bypassing the tree property here, to avoid validating # an empty tree. diff --git a/asdf/block.py b/asdf/block.py index d91d76168..4b1e35629 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -788,6 +788,7 @@ def find_or_create_block(self, key): block = Block() self.add(block, key=key) self._handle_global_block_settings(block) + self._data_to_block_mapping[key] = block return block From ead0f7c6e91d7d136bc0982e21709db180457649 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 13:08:57 -0400 Subject: [PATCH 12/34] remove non-array block storage settings i'm not quite happy with these and think a more a flexible (supporting more than storage/compression that matches arrays) and simplier interface might be possible. --- asdf/asdf.py | 118 -------------------------------------------------- asdf/block.py | 3 -- 2 files changed, 121 deletions(-) diff --git a/asdf/asdf.py b/asdf/asdf.py index 67c7c19fe..01562cfc3 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -748,44 +748,6 @@ def get_array_storage(self, arr): """ return self._blocks[arr].array_storage - def set_block_storage(self, obj, storage): - """ - Set the block storage type to use for the given binary object's data. - - Parameters - ---------- - obj : object - ASDF-serializable object. - - storage : str - Must be one of: - - - ``internal``: The default. The data will be - stored in a binary block in the same ASDF file. - - - ``external``: Store the data in a binary block in a - separate ASDF file. - - - ``inline``: Store the data as YAML inline in the tree. This option - may not be implemented for all object types. - """ - self._blocks._storage_settings[id(obj)] = storage - - def get_block_storage(self, obj): - """ - Get the block storage type for the given binary object's data. - - Parameters - ---------- - obj : object - ASDF-serializable object. - - Returns - ------- - str or None - """ - return self._blocks._storage_settings.get(id(obj), None) - def set_array_compression(self, arr, compression, **compression_kwargs): """ Set the compression to use for the given array data. @@ -833,50 +795,6 @@ def get_array_compression_kwargs(self, arr): """ """ return self._blocks[arr].output_compression_kwargs - def set_block_compression(self, obj, compression, **compression_kwargs): - """ - Set the compression to use for the given object's block data. - - Parameters - ---------- - obj : object - ASDF-serializable object. - - compression : str or None - Must be one of: - - - ``''`` or `None`: no compression - - - ``zlib``: Use zlib compression - - - ``bzp2``: Use bzip2 compression - - - ``lz4``: Use lz4 compression - - - ``input``: Use the same compression as in the file read. - If there is no prior file, acts as None. - - **kwargs - Additional compressor-specific arguments. - - """ - self._blocks._compression_settings[id(obj)] = (compression, compression_kwargs) - - def get_block_compression(self, obj): - """ - Get the compression type and arguments for the given object's block data. - - Parameters - ---------- - obj : object - ASDF-serializable object. - - Returns - ------- - (str, dict) - """ - return self._blocks._compression_settings.get(id(obj), (None, {})) - @classmethod def _parse_header_line(cls, line): """ @@ -1447,8 +1365,6 @@ def write_to( ) naf._tree = self.tree # avoid an extra validate # copy over block storage and other settings - naf._blocks._storage_settings = copy.deepcopy(self._blocks._storage_settings) - naf._blocks._compression_settings = copy.deepcopy(self._blocks._compression_settings) block_to_key_mapping = {v: k for k, v in self._blocks._data_to_block_mapping.items()} # this creates blocks in the new block manager that correspond to blocks # in the original file @@ -1472,11 +1388,6 @@ def write_to( blk = naf._blocks.find_or_create_block(key) blk._used = True blk._data_callback = b._data_callback - naf._blocks._storage_settings[key] = self._blocks._storage_settings.get(key, b.array_storage) - naf._blocks._compression_settings[key] = self._blocks._compression_settings.get( - key, - (b.output_compression, b.output_compression_kwargs), - ) naf._write_to(fd, **kwargs) def _write_to( @@ -2181,32 +2092,3 @@ def find_block_index(self, lookup_key, data_callback=None): """ blk = self._find_block(lookup_key, data_callback) return self._block_manager.get_source(blk) - - def get_block_storage_settings(self, lookup_key): - """ - TODO - TODO add corresponding set - """ - return self._block_manager._storage_settings.get(lookup_key, None) - - def get_block_compression_settings(self, lookup_key): - """ - TODO - TODO add corresponding set - """ - return self._block_manager._compression_settings.get(lookup_key, None) - - def set_block_storage_settings(self, lookup_key, storage): - """ - TODO - TODO add corresponding set - """ - self._block_manager._storage_settings[lookup_key] = storage - - def set_block_compression_settings(self, lookup_key, compression, compression_kwargs=None): - """ - TODO - TODO add corresponding set - """ - compression_kwargs = compression_kwargs or {} - self._block_manager._compression_settings[lookup_key] = (compression, compression_kwargs) diff --git a/asdf/block.py b/asdf/block.py index 4b1e35629..b71f00b64 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -42,9 +42,6 @@ def __init__(self, asdffile, copy_arrays=False, lazy_load=True): self._lazy_load = lazy_load self._internal_blocks_mapped = False - self._storage_settings = {} - self._compression_settings = {} - def __len__(self): """ Return the total number of blocks being managed. From 6cda51bdd5d0e65d1da1f91b0dc58e18f602e3a6 Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 13:23:26 -0400 Subject: [PATCH 13/34] pass serialization context to select tag --- asdf/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asdf/block.py b/asdf/block.py index b71f00b64..a3f82a772 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -561,8 +561,8 @@ def _find_used_blocks(self, tree, ctx, remove=True): for node in treeutil.iter_tree(tree): if ctx.extension_manager.handles_type(type(node)): converter = ctx.extension_manager.get_converter_for_type(type(node)) - tag = converter.select_tag(node, ctx) sctx = ctx._create_serialization_context() + tag = converter.select_tag(node, sctx) for key in converter.reserve_blocks(node, tag, sctx): reserved_blocks.add(self.find_or_create_block(key)) else: From ad5c9a4cc64fdbaa8e2288f9fa396b62f48fd92a Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 15:35:09 -0400 Subject: [PATCH 14/34] rename claim_block to assign_block_key --- asdf/_tests/test_block_converter.py | 11 ++++++----- asdf/asdf.py | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index fbd4eaef8..4263df1f5 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -31,7 +31,7 @@ def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] data = ctx.load_block(block_index, by_index=True) obj = BlockData(data.tobytes()) - ctx.claim_block(block_index, id(obj)) + ctx.assign_block_key(block_index, id(obj)) # -- alternatively, if data is not required to make the object -- # obj = BlockData(b"") @@ -142,7 +142,7 @@ def callback(_ctx=ctx, _key=key): obj.callback = callback - ctx.claim_block(block_index, key) + ctx.assign_block_key(block_index, key) return obj def reserve_blocks(self, obj, tag, ctx): @@ -232,7 +232,8 @@ def test_block_with_callback_removal(tmp_path): af[check_key] = b.data -# TODO tests to add -# - memmap/lazy_load other open options -# - block storage settings: compression, etc # - error cases when data is not of the correct type (not an ndarray, an invalid ndarray, etc) +# - reserve_blocks returns non-hashable type +# - sctx.load_block +# - sctx.assign_block_key +# - sctx.find_block_index diff --git a/asdf/asdf.py b/asdf/asdf.py index 01562cfc3..7ebc1ab11 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -2033,7 +2033,7 @@ def load_block(self, index_or_key, by_index=False): return blk.data - def claim_block(self, block_index, key): + def assign_block_key(self, block_index, key): """ Associate a unique hashable key with a block. From 63c28fcb7e3be8438e9a92953dfdaf6abc022e9d Mon Sep 17 00:00:00 2001 From: Brett Date: Thu, 30 Mar 2023 17:14:47 -0400 Subject: [PATCH 15/34] add tests for block management through serialization context --- asdf/_tests/test_block_converter.py | 63 ++++++++++++++++++++++------- asdf/asdf.py | 16 ++++---- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 4263df1f5..20bab6379 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -1,6 +1,7 @@ import contextlib import numpy as np +import pytest from numpy.testing import assert_array_equal import asdf @@ -16,6 +17,7 @@ def __init__(self, payload): class BlockConverter(Converter): tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] types = [BlockData] + _return_invalid_keys = False def to_yaml_tree(self, obj, tag, ctx): # lookup source for obj @@ -39,6 +41,10 @@ def from_yaml_tree(self, node, tag, ctx): return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? + if self._return_invalid_keys: + # return something unhashable + self._return_invalid_keys = False + return [[]] return [id(obj)] @@ -73,15 +79,16 @@ def test_block_converter_block_allocation(tmp_path): af["a"] = a # the AsdfFile instance should have no blocks assert len(af._blocks._internal_blocks) == 0 - # until validate is called + # validate will make a block af.validate() assert len(af._blocks._internal_blocks) == 1 - assert af._blocks._internal_blocks[0].data.tobytes() == a.payload + assert np.all(af._blocks._internal_blocks[0].data.tobytes() == a.payload) # a second validate shouldn't result in more blocks af.validate() assert len(af._blocks._internal_blocks) == 1 + # write_to will create blocks here because + # they currently hold storage settings fn = tmp_path / "test.asdf" - # nor should write_to af.write_to(fn) assert len(af._blocks._internal_blocks) == 1 @@ -102,8 +109,13 @@ def test_block_converter_block_allocation(tmp_path): @with_extension(BlockExtension) -def test_invalid_block_data(): - pass +def test_invalid_reserve_block_keys(tmp_path): + a = BlockData(b"abcdefg") + af = asdf.AsdfFile({"a": a}) + fn = tmp_path / "test.asdf" + BlockExtension.converters[0]._return_invalid_keys = True + with pytest.raises(TypeError, match="unhashable type: .*"): + af.write_to(fn) class BlockDataCallback: @@ -137,8 +149,8 @@ def from_yaml_tree(self, node, tag, ctx): # to generate a key key = id(obj) - def callback(_ctx=ctx, _key=key): - return _ctx.load_block(_key) + def callback(): + return ctx.load_block(key) obj.callback = callback @@ -172,15 +184,15 @@ def test_block_data_callback_converter(tmp_path): af["a"] = a # the AsdfFile instance should have no blocks assert len(af._blocks._internal_blocks) == 0 - # until validate is called + # validate will make a block af.validate() assert len(af._blocks._internal_blocks) == 1 assert np.all(af._blocks._internal_blocks[0].data == a.data) # a second validate shouldn't result in more blocks af.validate() assert len(af._blocks._internal_blocks) == 1 + # write_to will use the block fn1 = tmp_path / "test.asdf" - # nor should write_to af.write_to(fn1) assert len(af._blocks._internal_blocks) == 1 @@ -232,8 +244,31 @@ def test_block_with_callback_removal(tmp_path): af[check_key] = b.data -# - error cases when data is not of the correct type (not an ndarray, an invalid ndarray, etc) -# - reserve_blocks returns non-hashable type -# - sctx.load_block -# - sctx.assign_block_key -# - sctx.find_block_index +def test_seralization_context_block_access(): + af = asdf.AsdfFile() + sctx = af._create_serialization_context() + + # finding an index for an unknown block should + # create one + key = 42 + arr = np.ones(3, dtype="uint8") + index = sctx.find_block_index(key, lambda: arr) + assert len(af._blocks) == 1 + assert id(arr) == id(sctx.load_block(key)) + assert id(arr) == id(sctx.load_block(index, by_index=True)) + # finding the same block should not create a new one + index = sctx.find_block_index(key, lambda: arr) + assert len(af._blocks) == 1 + + new_key = 26 + sctx.assign_block_key(index, new_key) + assert len(af._blocks) == 1 + # both old and new keys work + assert id(arr) == id(sctx.load_block(key)) + assert id(arr) == id(sctx.load_block(new_key)) + + arr2 = np.zeros(3, dtype="uint8") + # test that providing a new callback won't overwrite + # the first one + index = sctx.find_block_index(key, lambda: arr2) + assert id(arr2) != id(sctx.load_block(key)) diff --git a/asdf/asdf.py b/asdf/asdf.py index 7ebc1ab11..7db10509b 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -2059,14 +2059,6 @@ def assign_block_key(self, block_index, key): blk = self._block_manager.get_block(block_index) self._block_manager._data_to_block_mapping[key] = blk - def _find_block(self, lookup_key, data_callback=None): - new_block = lookup_key not in self._block_manager._data_to_block_mapping - blk = self._block_manager.find_or_create_block(lookup_key) - # if we're not creating a block, don't update the data callback - if data_callback is not None and (new_block or (blk._data_callback is None and blk._fd is None)): - blk._data_callback = data_callback - return blk - def find_block_index(self, lookup_key, data_callback=None): """ Find the index of a previously allocated or reserved block. @@ -2083,6 +2075,8 @@ def find_block_index(self, lookup_key, data_callback=None): data_callback: callable, optional Callable that when called will return data (ndarray) that will be written to a block. + At the moment, this is only assigned if a new block + is created to avoid circular references during AsdfFile.update. Returns ------- @@ -2090,5 +2084,9 @@ def find_block_index(self, lookup_key, data_callback=None): Index of the block where data returned from data_callback will be written. """ - blk = self._find_block(lookup_key, data_callback) + new_block = lookup_key not in self._block_manager._data_to_block_mapping + blk = self._block_manager.find_or_create_block(lookup_key) + # if we're not creating a block, don't update the data callback + if data_callback is not None and (new_block or (blk._data_callback is None and blk._fd is None)): + blk._data_callback = data_callback return self._block_manager.get_source(blk) From fc9a8f004c76670ed15c12e0cda8d25f41f6a307 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 12:10:07 -0400 Subject: [PATCH 16/34] test invalid key with get_block_by_key --- asdf/_tests/test_block_converter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 20bab6379..136da8a52 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -134,8 +134,6 @@ class BlockDataCallbackConverter(Converter): types = [BlockDataCallback] def to_yaml_tree(self, obj, tag, ctx): - # this will be called during validate and might overwrite the callback - # lookup source for obj block_index = ctx.find_block_index(id(obj), obj.callback) return { "block_index": block_index, @@ -266,6 +264,9 @@ def test_seralization_context_block_access(): # both old and new keys work assert id(arr) == id(sctx.load_block(key)) assert id(arr) == id(sctx.load_block(new_key)) + # an unknown key should fail + with pytest.raises(KeyError, match="Unknown block key .*"): + sctx.load_block(-1) arr2 = np.zeros(3, dtype="uint8") # test that providing a new callback won't overwrite From 4d8d1e8a52dda0375622854ea2fa086a1b66660a Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 13:59:31 -0400 Subject: [PATCH 17/34] initial block converter docs --- asdf/extension/_converter.py | 2 +- docs/asdf/extending/converters.rst | 87 ++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 44371b703..3ceb96cf3 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -169,7 +169,7 @@ def reserve_blocks(self, obj, tag, ctx): The context of the current serialization request. Returns - ------ + ------- keys : list of unique hashable keys These keys will be used to reserve blocks for later use """ diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 2535dfa62..8934dc617 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -280,6 +280,93 @@ With this modification we can successfully deserialize our ASDF file: assert reconstituted_f1.inverse.inverse is reconstituted_f1 +.. _extending_converter_block_storage: + +Block storage +============= + +As described in the :ref:`extending_converters` Converters can return complex objects that will +be pass to other Converters. If a Converter returns a ndarray, ASDF will recognize this +array and store it in an ASDF block. + +For applications where this isn't possible or more control of the block storage is required +the Converters allow for more flexible block storage through use of the ``SerializationContext`` +provided as an argument to `Converter.to_yaml_tree` `Converter.from_yaml_tree` and `Converter.select_tag`. + +A simple example of a Converter using block storage to store the ``payload`` for +``BlockData`` object instances is as follows: + +.. runcode:: + + import asdf + import numpy as np + from asdf.extension import Converter, Extension + + class BlockData: + def __init__(self, payload): + self.payload = payload + + + class BlockConverter(Converter): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + types = [BlockData] + + def to_yaml_tree(self, obj, tag, ctx): + block_index = ctx.find_block_index( + id(obj), + lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), + ) + return {"block_index": block_index} + + def from_yaml_tree(self, node, tag, ctx): + block_index = node["block_index"] + data = ctx.load_block(block_index, by_index=True) + obj = BlockData(data.tobytes()) + ctx.assign_block_key(block_index, id(obj)) + return obj + + def reserve_blocks(self, obj, tag, ctx): + return [id(obj)] + + class BlockExtension(Extension): + tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] + converters = [BlockConverter()] + extension_uri = "asdf://somewhere.org/extensions/block_data-1.0.0" + + with asdf.config_context() as cfg: + cfg.add_extension(BlockExtension()) + ff = asdf.AsdfFile({"example": BlockData(b"abcdefg")}) + ff.write_to("block_converter_example.asdf") + +.. asdf:: block_converter_example.asdf + +To discuss the above example, it is helpful to first review some details of how ASDF +:ref:`stores block `. Blocks are stored sequentially within a +ASDF file following the YAML tree. Converters can read and write these blocks +based on the index of the block within the file. + +During read (``Converter.from_yaml_tree``) data for a specific block can be read +by providing the ``index`` of the block to ``SerializationContext.load_block`` (and setting the +``by_index`` argument to True). ``Converter.from_yaml_tree`` returns the deserialized custom +object that will be placed in the ``AsdfFile.tree``. ``SerializationContext.assign_block_key`` +should also be called during ``Converter.from_yaml_tree`` to allow ASDF to associate +a unique hashable key with any block used during conversion of this object. This is +important as the ordering of blocks in memory might change during an update in place. +Furthermore, This key can be used to lazily load block data by later calling +``SerializationContext.load_block`` with the assigned key. + +During write, ``Converter.to_yaml_tree`` can prepare data to be stored in a block +by calling ``SerializationContext.find_block_index`` to find the location of an +available block. ``SerializationContext.find_block_index`` should be called with a +hashable key unique to this object (and the same as the key used during reading) +and a callback function that accepts no arguments and returns the ndarray to save +within the block. + +A Converter that uses block storage must also define ``Converter.reserve_blocks``. +``Converter.reserve_blocks`` will be called during memory management to free +resources for unused blocks and allocate. ``Converter.reserve_blocks`` must +return a list of keys associated with the object provided as the first argument. + .. _extending_converters_performance: Entry point performance considerations From f8ac711a5a6abc134f3a0bc4129bf7caa3f9521a Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 14:52:49 -0400 Subject: [PATCH 18/34] refine docs for block converter --- docs/asdf/extending/converters.rst | 65 ++++++++++++++++-------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 8934dc617..ccd6c434f 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -285,14 +285,25 @@ With this modification we can successfully deserialize our ASDF file: Block storage ============= -As described in the :ref:`extending_converters` Converters can return complex objects that will -be pass to other Converters. If a Converter returns a ndarray, ASDF will recognize this -array and store it in an ASDF block. +As described above :ref:`extending_converters` can return complex objects that will +be passed to other Converters. If a Converter returns a ndarray, ASDF will recognize this +array and store it in an ASDF block. This is the easiest and preferred means of +storing data in ASDF blocks. -For applications where this isn't possible or more control of the block storage is required -the Converters allow for more flexible block storage through use of the ``SerializationContext`` +For applications that require more flexibility, +Converters can control block storage through use of the ``SerializationContext`` provided as an argument to `Converter.to_yaml_tree` `Converter.from_yaml_tree` and `Converter.select_tag`. +It is helpful to first review some details of how ASDF +:ref:`stores block `. Blocks are stored sequentially within a +ASDF file following the YAML tree. During reads and writes, ASDF will need to know +the index of the block a Converter would like to use to read or write the correct +block. However, the index used for reading might not be the same index for writing +if the tree was modified or the file is being written to a new location. To allow +ASDF to track the relationship between blocks and objects, Converters will need +to generate unique hashable keys for each block used and associate these keys with +block indices during read and write (more on this below). + A simple example of a Converter using block storage to store the ``payload`` for ``BlockData`` object instances is as follows: @@ -320,9 +331,10 @@ A simple example of a Converter using block storage to store the ``payload`` for def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] - data = ctx.load_block(block_index, by_index=True) - obj = BlockData(data.tobytes()) - ctx.assign_block_key(block_index, id(obj)) + obj = BlockData(b"") + key = id(obj) + ctx.assign_block_key(block_index, key) + obj.payload = ctx.load_block(key) return obj def reserve_blocks(self, obj, tag, ctx): @@ -340,32 +352,23 @@ A simple example of a Converter using block storage to store the ``payload`` for .. asdf:: block_converter_example.asdf -To discuss the above example, it is helpful to first review some details of how ASDF -:ref:`stores block `. Blocks are stored sequentially within a -ASDF file following the YAML tree. Converters can read and write these blocks -based on the index of the block within the file. - -During read (``Converter.from_yaml_tree``) data for a specific block can be read -by providing the ``index`` of the block to ``SerializationContext.load_block`` (and setting the -``by_index`` argument to True). ``Converter.from_yaml_tree`` returns the deserialized custom -object that will be placed in the ``AsdfFile.tree``. ``SerializationContext.assign_block_key`` -should also be called during ``Converter.from_yaml_tree`` to allow ASDF to associate -a unique hashable key with any block used during conversion of this object. This is -important as the ordering of blocks in memory might change during an update in place. -Furthermore, This key can be used to lazily load block data by later calling -``SerializationContext.load_block`` with the assigned key. - -During write, ``Converter.to_yaml_tree`` can prepare data to be stored in a block -by calling ``SerializationContext.find_block_index`` to find the location of an -available block. ``SerializationContext.find_block_index`` should be called with a -hashable key unique to this object (and the same as the key used during reading) -and a callback function that accepts no arguments and returns the ndarray to save -within the block. +During read, ``Converter.from_yaml_tree`` will be called. Within this method +the Converter should associate any used blocks with unique hashable keys by calling +``SerializationContext.assign_block_key`` and can load block data using +``SerializationContext.load_block``. + +During write, ``Converter.to_yaml_tree`` will be called. The Converter should +use ``SerializationContext.find_block_index`` to find the location of an +available block by providing a hashable key unique to this object (this should +be the same key used during reading to allow ASDF to associate blocks and objects +during in-place updates). The second argument to ``SerializationContext.find_block_index`` +must be a callable function (returning a ndarray) that ASDF will call when it +is time to write data to the portion of the file corresponding to this block. A Converter that uses block storage must also define ``Converter.reserve_blocks``. ``Converter.reserve_blocks`` will be called during memory management to free -resources for unused blocks and allocate. ``Converter.reserve_blocks`` must -return a list of keys associated with the object provided as the first argument. +resources for unused blocks. ``Converter.reserve_blocks`` must +return a list of keys associated with an object. .. _extending_converters_performance: From 2f5e3873d8f8c1f13f851c0bcc8e8ab99d6fb632 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 14:54:14 -0400 Subject: [PATCH 19/34] add changelog --- CHANGES.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.rst b/CHANGES.rst index 6bfcb1ed8..006727c8a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -8,6 +8,7 @@ The ASDF Standard is at v1.6.0 - Add ``all_array_storage``, ``all_array_compression`` and ``all_array_compression_kwargs`` to ``asdf.config.AsdfConfig`` [#1468] - Move built-in tags to converters (except ndarray and integer). [#1474] +- Add block storage support to Converter [#1508] 2.15.0 (2023-03-28) ------------------- From 74d1111b9cd2f925bc457f190c1e3b80739cc5fd Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 15:16:13 -0400 Subject: [PATCH 20/34] remove comment --- asdf/_tests/test_array_blocks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index aee44c228..f346597d2 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -1003,10 +1003,7 @@ def test_remove_blocks(tmp_path): with asdf.open(fn1, mode="rw") as af: assert len(af._blocks._internal_blocks) == 2 af["a"] = None - # issue: https://github.com/asdf-format/asdf/issues/1505 - # prevents us from calling write_to then update af.write_to(fn2) - # af.update() with asdf.open(fn1, mode="rw") as af: assert len(af._blocks._internal_blocks) == 2 From fd0e26c49feba54c5363c63e32f1b60a2a3cf235 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 19:11:18 -0400 Subject: [PATCH 21/34] convert load_block to callback generating get_block_data_callback --- asdf/_tests/test_block_converter.py | 27 ++++----------------------- asdf/asdf.py | 28 ++++++++++------------------ asdf/block.py | 8 ++++++++ asdf/extension/_converter.py | 2 +- docs/asdf/extending/converters.rst | 9 ++++++--- 5 files changed, 29 insertions(+), 45 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 136da8a52..0b9b336c7 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -31,13 +31,9 @@ def to_yaml_tree(self, obj, tag, ctx): def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] - data = ctx.load_block(block_index, by_index=True) + data = ctx.get_block_data_callback(block_index)() obj = BlockData(data.tobytes()) ctx.assign_block_key(block_index, id(obj)) - - # -- alternatively, if data is not required to make the object -- - # obj = BlockData(b"") - # obj.payload = ctx.load_block(block_index, id(obj)) return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? @@ -142,16 +138,8 @@ def to_yaml_tree(self, obj, tag, ctx): def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] - obj = BlockDataCallback(lambda: None) - # now that we have an object we use it's memory location - # to generate a key + obj = BlockDataCallback(ctx.get_block_data_callback(block_index)) key = id(obj) - - def callback(): - return ctx.load_block(key) - - obj.callback = callback - ctx.assign_block_key(block_index, key) return obj @@ -252,8 +240,7 @@ def test_seralization_context_block_access(): arr = np.ones(3, dtype="uint8") index = sctx.find_block_index(key, lambda: arr) assert len(af._blocks) == 1 - assert id(arr) == id(sctx.load_block(key)) - assert id(arr) == id(sctx.load_block(index, by_index=True)) + assert id(arr) == id(sctx.get_block_data_callback(index)()) # finding the same block should not create a new one index = sctx.find_block_index(key, lambda: arr) assert len(af._blocks) == 1 @@ -261,15 +248,9 @@ def test_seralization_context_block_access(): new_key = 26 sctx.assign_block_key(index, new_key) assert len(af._blocks) == 1 - # both old and new keys work - assert id(arr) == id(sctx.load_block(key)) - assert id(arr) == id(sctx.load_block(new_key)) - # an unknown key should fail - with pytest.raises(KeyError, match="Unknown block key .*"): - sctx.load_block(-1) arr2 = np.zeros(3, dtype="uint8") # test that providing a new callback won't overwrite # the first one index = sctx.find_block_index(key, lambda: arr2) - assert id(arr2) != id(sctx.load_block(key)) + assert id(arr2) != id(sctx.get_block_data_callback(index)()) diff --git a/asdf/asdf.py b/asdf/asdf.py index 7db10509b..4550d2609 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -2006,32 +2006,24 @@ def _extensions_used(self): """ return self.__extensions_used - def load_block(self, index_or_key, by_index=False): + def get_block_data_callback(self, index): """ - Return data from a block using either the block index - or block key. + Generate a callable that when called will read data + from a block at the provided index Parameters ---------- - index_or_key : int or hashable - - by_index : bool, optional - if True, treat index_or_key as a block index - if False (default), treat index_or_key as a key + index : int + Block index Returns ------- - block_data : ndarray - The ndarray block data (one dimension, uint8 dtype) + callback : callable + A callable that when called (with no arguments) returns + the block data as a one dimensional array of uint8 """ - if by_index: - # index_or_key is a block index - blk = self._block_manager.get_block(index_or_key) - else: - # index_or_key is a block key - blk = self._block_manager.get_block_by_key(index_or_key) - - return blk.data + blk = self._block_manager.get_block(index) + return blk.generate_read_data_callback() def assign_block_key(self, block_index, key): """ diff --git a/asdf/block.py b/asdf/block.py index a3f82a772..33a97212b 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -1305,6 +1305,14 @@ def data(self): def close(self): self._data = None + def generate_read_data_callback(self): + """Used in SerializationContext.get_block_data_callback""" + + def callback(): + return self.data + + return callback + class UnloadedBlock: """ diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 3ceb96cf3..319055254 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -321,7 +321,7 @@ def reserve_blocks(self, obj, tag, ctx): The context of the current serialization request. Returns - ------ + ------- keys : list of unique hashable keys These keys will be used to reserve blocks for later use diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index ccd6c434f..83299a215 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -334,7 +334,7 @@ A simple example of a Converter using block storage to store the ``payload`` for obj = BlockData(b"") key = id(obj) ctx.assign_block_key(block_index, key) - obj.payload = ctx.load_block(key) + obj.payload = ctx.get_block_data_callback(block_index)() return obj def reserve_blocks(self, obj, tag, ctx): @@ -354,8 +354,11 @@ A simple example of a Converter using block storage to store the ``payload`` for During read, ``Converter.from_yaml_tree`` will be called. Within this method the Converter should associate any used blocks with unique hashable keys by calling -``SerializationContext.assign_block_key`` and can load block data using -``SerializationContext.load_block``. +``SerializationContext.assign_block_key`` and can generate (and use) a callable +function that will return block data using ``SerializationContext.get_block_data_callback``. +A callback for reading the data is provided to support lazy loading without +keeping a reference to the ``SerializationContext`` (which is meant to be +a short lived and lightweight object). During write, ``Converter.to_yaml_tree`` will be called. The Converter should use ``SerializationContext.find_block_index`` to find the location of an From 264fdf57df67380a6db21a537c236cf27b5929fe Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 31 Mar 2023 19:55:57 -0400 Subject: [PATCH 22/34] replace use of id(obj) for block key --- asdf/_tests/test_block_converter.py | 27 ++++++++++++++++++++------- docs/asdf/extending/converters.rst | 14 ++++++++++---- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 0b9b336c7..49612bd18 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -9,9 +9,22 @@ from asdf.testing import helpers +class AsdfKey: + _next = 0 + + def __init__(self): + self._key = AsdfKey._next + AsdfKey._next += 1 + + def __hash__(self): + return self._key + + class BlockData: def __init__(self, payload): self.payload = payload + # generate a unique id + self._asdf_key = AsdfKey() class BlockConverter(Converter): @@ -22,7 +35,7 @@ class BlockConverter(Converter): def to_yaml_tree(self, obj, tag, ctx): # lookup source for obj block_index = ctx.find_block_index( - id(obj), + obj._asdf_key, lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), ) return { @@ -33,7 +46,7 @@ def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] data = ctx.get_block_data_callback(block_index)() obj = BlockData(data.tobytes()) - ctx.assign_block_key(block_index, id(obj)) + ctx.assign_block_key(block_index, obj._asdf_key) return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? @@ -41,7 +54,7 @@ def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? # return something unhashable self._return_invalid_keys = False return [[]] - return [id(obj)] + return [obj._asdf_key] class BlockExtension(Extension): @@ -119,6 +132,7 @@ class BlockDataCallback: def __init__(self, callback): self.callback = callback + self._asdf_key = AsdfKey() @property def data(self): @@ -130,7 +144,7 @@ class BlockDataCallbackConverter(Converter): types = [BlockDataCallback] def to_yaml_tree(self, obj, tag, ctx): - block_index = ctx.find_block_index(id(obj), obj.callback) + block_index = ctx.find_block_index(obj._asdf_key, obj.callback) return { "block_index": block_index, } @@ -139,12 +153,11 @@ def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] obj = BlockDataCallback(ctx.get_block_data_callback(block_index)) - key = id(obj) - ctx.assign_block_key(block_index, key) + ctx.assign_block_key(block_index, obj._asdf_key) return obj def reserve_blocks(self, obj, tag, ctx): - return [id(obj)] + return [obj._asdf_key] class BlockDataCallbackExtension(Extension): diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 83299a215..074db57b9 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -304,6 +304,10 @@ ASDF to track the relationship between blocks and objects, Converters will need to generate unique hashable keys for each block used and associate these keys with block indices during read and write (more on this below). +.. note:: + Use of ``id(obj)`` will not generate a unique key as it returns the memory address + which might be reused after the object is garbage collected. + A simple example of a Converter using block storage to store the ``payload`` for ``BlockData`` object instances is as follows: @@ -314,8 +318,11 @@ A simple example of a Converter using block storage to store the ``payload`` for from asdf.extension import Converter, Extension class BlockData: + _next_key = 0 def __init__(self, payload): self.payload = payload + self._asdf_key = BlockData._next_key + BlockData._next_key += 1 class BlockConverter(Converter): @@ -324,7 +331,7 @@ A simple example of a Converter using block storage to store the ``payload`` for def to_yaml_tree(self, obj, tag, ctx): block_index = ctx.find_block_index( - id(obj), + obj._asdf_key, lambda: np.ndarray(len(obj.payload), dtype="uint8", buffer=obj.payload), ) return {"block_index": block_index} @@ -332,13 +339,12 @@ A simple example of a Converter using block storage to store the ``payload`` for def from_yaml_tree(self, node, tag, ctx): block_index = node["block_index"] obj = BlockData(b"") - key = id(obj) - ctx.assign_block_key(block_index, key) + ctx.assign_block_key(block_index, obj._asdf_key) obj.payload = ctx.get_block_data_callback(block_index)() return obj def reserve_blocks(self, obj, tag, ctx): - return [id(obj)] + return [obj._asdf_key] class BlockExtension(Extension): tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] From ee2187618e681cf57a30b87ecbda15a4b7d975f5 Mon Sep 17 00:00:00 2001 From: Brett Date: Sat, 1 Apr 2023 06:06:59 -0400 Subject: [PATCH 23/34] add asdf.util.BlockKey for generating unique keys --- asdf/_tests/test_block_converter.py | 15 ++------------- asdf/util.py | 17 +++++++++++++++++ docs/asdf/extending/converters.rst | 4 +--- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 49612bd18..4e21fa2b8 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -9,22 +9,11 @@ from asdf.testing import helpers -class AsdfKey: - _next = 0 - - def __init__(self): - self._key = AsdfKey._next - AsdfKey._next += 1 - - def __hash__(self): - return self._key - - class BlockData: def __init__(self, payload): self.payload = payload # generate a unique id - self._asdf_key = AsdfKey() + self._asdf_key = asdf.util.BlockKey() class BlockConverter(Converter): @@ -132,7 +121,7 @@ class BlockDataCallback: def __init__(self, callback): self.callback = callback - self._asdf_key = AsdfKey() + self._asdf_key = asdf.util.BlockKey() @property def data(self): diff --git a/asdf/util.py b/asdf/util.py index 285b59a80..245001be2 100644 --- a/asdf/util.py +++ b/asdf/util.py @@ -529,3 +529,20 @@ class FileType(enum.Enum): ASDF = 1 FITS = 2 UNKNOWN = 3 + + +class BlockKey: + """ + Helper class that generates a unique hashable value for every instance + useful for associates blocks and objects during serialization and + deserialization + """ + + _next = 0 + + def __init__(self): + self._key = BlockKey._next + BlockKey._next += 1 + + def __hash__(self): + return self._key diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 074db57b9..6a2a76603 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -318,11 +318,9 @@ A simple example of a Converter using block storage to store the ``payload`` for from asdf.extension import Converter, Extension class BlockData: - _next_key = 0 def __init__(self, payload): self.payload = payload - self._asdf_key = BlockData._next_key - BlockData._next_key += 1 + self._asdf_key = asdf.util.BlockKey() class BlockConverter(Converter): From a730090725aebc1e813fdb2df4ca93f2e8338018 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 3 Apr 2023 13:30:52 -0400 Subject: [PATCH 24/34] add BlockKey test --- asdf/_tests/test_util.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/asdf/_tests/test_util.py b/asdf/_tests/test_util.py index 112221390..20b216663 100644 --- a/asdf/_tests/test_util.py +++ b/asdf/_tests/test_util.py @@ -117,3 +117,14 @@ def test_minversion(): assert util.minversion(yaml, "3.1") assert util.minversion("yaml", "3.1") + + +def test_block_key(): + bk = util.BlockKey() + # make sure block key is hashable and can serve as a dictionary key + hash(bk) + d = {bk: 1} + # a new key should produce a different hash than the first + bk2 = util.BlockKey() + d[bk2] = 2 + assert len(d) == 2 From ce5da7ed88b7fd34e41cbe1a10d7fd6dfa1f3051 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 10:00:11 -0400 Subject: [PATCH 25/34] add test with external block storage and defined uri remove unnecessary uri assignment and note about uri --- asdf/_tests/test_array_blocks.py | 9 +++++++++ asdf/asdf.py | 3 +-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/asdf/_tests/test_array_blocks.py b/asdf/_tests/test_array_blocks.py index f346597d2..b95be394b 100644 --- a/asdf/_tests/test_array_blocks.py +++ b/asdf/_tests/test_array_blocks.py @@ -27,6 +27,15 @@ def test_external_block(tmp_path): assert "test0000.asdf" in os.listdir(tmp_path) +def test_external_block_url(): + uri = "asdf://foo" + my_array = RNG.normal(size=(8, 8)) + tree = {"my_array": my_array} + asdf.get_config().all_array_storage = "external" + # this should not raise a ValueError since uri is provided + asdf.AsdfFile(tree, uri=uri) + + def test_external_block_non_url(): my_array = RNG.normal(size=(8, 8)) tree = {"my_array": my_array} diff --git a/asdf/asdf.py b/asdf/asdf.py index 4550d2609..355f78144 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -148,6 +148,7 @@ def __init__( self._closed = False self._external_asdf_by_uri = {} self._blocks = block.BlockManager(self, copy_arrays=copy_arrays, lazy_load=lazy_load) + # set the uri here so validation can generate any required external blocks self._uri = uri if tree is None: # Bypassing the tree property here, to avoid validating @@ -168,8 +169,6 @@ def __init__( else: self.tree = tree self.find_references() - if uri is not None: - self._uri = uri self._comments = [] From a7a6418e35963c9244bb2053be8648b3f6afadff Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 13:06:45 -0400 Subject: [PATCH 26/34] separate _key_to_block and _data_to_block mappings --- asdf/_tests/test_block_converter.py | 20 +++++++++++++++++++- asdf/asdf.py | 12 +++++++++--- asdf/block.py | 15 ++++++++++----- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 4e21fa2b8..2156a43ca 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -20,6 +20,7 @@ class BlockConverter(Converter): tags = ["asdf://somewhere.org/tags/block_data-1.0.0"] types = [BlockData] _return_invalid_keys = False + _double_assign_block = False def to_yaml_tree(self, obj, tag, ctx): # lookup source for obj @@ -36,6 +37,10 @@ def from_yaml_tree(self, node, tag, ctx): data = ctx.get_block_data_callback(block_index)() obj = BlockData(data.tobytes()) ctx.assign_block_key(block_index, obj._asdf_key) + if self._double_assign_block: + self._double_assign_block = False + key2 = asdf.util.BlockKey() + ctx.assign_block_key(block_index, key2) return obj def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? @@ -116,6 +121,18 @@ def test_invalid_reserve_block_keys(tmp_path): af.write_to(fn) +@with_extension(BlockExtension) +def test_double_assign_block(tmp_path): + a = BlockData(b"abcdefg") + af = asdf.AsdfFile({"a": a}) + fn = tmp_path / "test.asdf" + af.write_to(fn) + BlockExtension.converters[0]._double_assign_block = True + with pytest.raises(ValueError, match="block 0 is already assigned to a key"): + with asdf.open(fn): + pass + + class BlockDataCallback: """An example object that uses the data callback to read block data""" @@ -248,7 +265,8 @@ def test_seralization_context_block_access(): assert len(af._blocks) == 1 new_key = 26 - sctx.assign_block_key(index, new_key) + with pytest.raises(ValueError, match="block 0 is already assigned to a key"): + sctx.assign_block_key(index, new_key) assert len(af._blocks) == 1 arr2 = np.zeros(3, dtype="uint8") diff --git a/asdf/asdf.py b/asdf/asdf.py index 355f78144..215c5d616 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1364,7 +1364,7 @@ def write_to( ) naf._tree = self.tree # avoid an extra validate # copy over block storage and other settings - block_to_key_mapping = {v: k for k, v in self._blocks._data_to_block_mapping.items()} + block_to_key_mapping = {v: k for k, v in self._blocks._key_to_block_mapping.items()} # this creates blocks in the new block manager that correspond to blocks # in the original file for b in self._blocks.blocks: @@ -2048,7 +2048,13 @@ def assign_block_key(self, block_index, key): A unique hashable key to associate with a block """ blk = self._block_manager.get_block(block_index) - self._block_manager._data_to_block_mapping[key] = blk + if self._block_manager._key_to_block_mapping.get(key, blk) is not blk: + msg = f"key {key} is already assigned to a block" + raise ValueError(msg) + if blk in self._block_manager._key_to_block_mapping.values(): + msg = f"block {block_index} is already assigned to a key" + raise ValueError(msg) + self._block_manager._key_to_block_mapping[key] = blk def find_block_index(self, lookup_key, data_callback=None): """ @@ -2075,7 +2081,7 @@ def find_block_index(self, lookup_key, data_callback=None): Index of the block where data returned from data_callback will be written. """ - new_block = lookup_key not in self._block_manager._data_to_block_mapping + new_block = lookup_key not in self._block_manager._key_to_block_mapping blk = self._block_manager.find_or_create_block(lookup_key) # if we're not creating a block, don't update the data callback if data_callback is not None and (new_block or (blk._data_callback is None and blk._fd is None)): diff --git a/asdf/block.py b/asdf/block.py index 33a97212b..9acb7ea4a 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -37,6 +37,7 @@ def __init__(self, asdffile, copy_arrays=False, lazy_load=True): } self._data_to_block_mapping = {} + self._key_to_block_mapping = {} self._validate_checksums = False self._memmap = not copy_arrays self._lazy_load = lazy_load @@ -81,7 +82,9 @@ def _add(self, block, key=None): if block._data is not None or key is not None: if key is None: key = id(util.get_array_base(block._data)) - self._data_to_block_mapping[key] = block + self._data_to_block_mapping[key] = block + else: + self._key_to_block_mapping[key] = block def remove(self, block): """ @@ -94,6 +97,9 @@ def remove(self, block): for key, blk in list(self._data_to_block_mapping.items()): if blk is block: del self._data_to_block_mapping[key] + for key, blk in list(self._key_to_block_mapping.items()): + if blk is block: + del self._key_to_block_mapping[key] else: msg = f"Unknown array storage type {block.array_storage}" raise ValueError(msg) @@ -619,10 +625,10 @@ def finalize(self, ctx): self._handle_global_block_settings(block) def get_block_by_key(self, key): - if key not in self._data_to_block_mapping: + if key not in self._key_to_block_mapping: msg = f"Unknown block key {key}" raise KeyError(msg) - return self._data_to_block_mapping[key] + return self._key_to_block_mapping[key] def get_block(self, source): """ @@ -778,14 +784,13 @@ def find_or_create_block(self, key): ------- block : Block """ - block = self._data_to_block_mapping.get(key) + block = self._key_to_block_mapping.get(key) if block is not None: return block block = Block() self.add(block, key=key) self._handle_global_block_settings(block) - self._data_to_block_mapping[key] = block return block From 33706275b45e8382353a988e2cee242687a47390 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 13:29:48 -0400 Subject: [PATCH 27/34] don't modify history and asdf_library on write_to --- asdf/_tests/test_api.py | 16 ++++++++++++++++ asdf/asdf.py | 9 +++++++++ 2 files changed, 25 insertions(+) diff --git a/asdf/_tests/test_api.py b/asdf/_tests/test_api.py index a20b3e59f..24ceec8f6 100644 --- a/asdf/_tests/test_api.py +++ b/asdf/_tests/test_api.py @@ -1,3 +1,4 @@ +import copy import getpass import io import os @@ -548,3 +549,18 @@ def test_asdf_standard_version_tag_selection(): content = buff.read() assert b"!core/asdf-1.0.0" not in content assert b"!core/asdf-1.1.0" in content + + +def test_write_to_no_tree_modification(tmp_path): + fn = tmp_path / "test.asdf" + fn2 = tmp_path / "test2.asdf" + tree = {"foo": None} + af = asdf.AsdfFile(tree.copy()) + af.write_to(fn) + assert tree == af.tree + with asdf.open(fn) as af: + af["history"]["extensions"][0]["software"]["version"] = "0.0.0.dev+abcdefg" + af["asdf_library"]["author"] = "foo" + tree = copy.deepcopy(af.tree) + af.write_to(fn2) + assert af.tree == tree diff --git a/asdf/asdf.py b/asdf/asdf.py index 215c5d616..b9ea935d4 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1363,6 +1363,11 @@ def write_to( ignore_implicit_conversion=self._ignore_implicit_conversion, ) naf._tree = self.tree # avoid an extra validate + pre_write_tree = {} + modified_keys = ["history", "asdf_library"] + for k in modified_keys: + if k in self.tree: + pre_write_tree[k] = copy.deepcopy(self.tree[k]) # copy over block storage and other settings block_to_key_mapping = {v: k for k, v in self._blocks._key_to_block_mapping.items()} # this creates blocks in the new block manager that correspond to blocks @@ -1388,6 +1393,10 @@ def write_to( blk._used = True blk._data_callback = b._data_callback naf._write_to(fd, **kwargs) + for k in modified_keys: + if k in self._tree: + del self._tree[k] + self._tree.update(pre_write_tree) def _write_to( self, From ddffc0f8decbeade0dd90034d0301ac5785957f5 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 13:39:45 -0400 Subject: [PATCH 28/34] add note about repeatability of data_callback --- docs/asdf/extending/converters.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 6a2a76603..60198f2fc 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -371,6 +371,10 @@ be the same key used during reading to allow ASDF to associate blocks and object during in-place updates). The second argument to ``SerializationContext.find_block_index`` must be a callable function (returning a ndarray) that ASDF will call when it is time to write data to the portion of the file corresponding to this block. +Note that it's possible this callback will be called multiple times during a +write and ASDF will not cache the result. If the data is coming from a non-repeatable +source (such as a non-seekable stream of bytes) the data should be cached prior +to providing it to ASDF to allow ASDF to call the callback multiple times. A Converter that uses block storage must also define ``Converter.reserve_blocks``. ``Converter.reserve_blocks`` will be called during memory management to free From 23f9a903c300bb961d57d01aa7c2e633efaeab22 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 13:42:44 -0400 Subject: [PATCH 29/34] remove unnecessary util.get_array_base --- asdf/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asdf/block.py b/asdf/block.py index 9acb7ea4a..df8977f8c 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -81,7 +81,7 @@ def _add(self, block, key=None): if block._data is not None or key is not None: if key is None: - key = id(util.get_array_base(block._data)) + key = id(block._data) self._data_to_block_mapping[key] = block else: self._key_to_block_mapping[key] = block From a2be1e815c703bd89009950adefd665aaa00e493 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 14:20:08 -0400 Subject: [PATCH 30/34] switch 0 to False --- asdf/block.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asdf/block.py b/asdf/block.py index df8977f8c..c4fd7e5f3 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -579,11 +579,11 @@ def _find_used_blocks(self, tree, ctx, remove=True): if remove: for block in list(self.blocks): - if getattr(block, "_used", 0) == 0 and block not in reserved_blocks: + if not getattr(block, "_used", False) and block not in reserved_blocks: self.remove(block) return None for block in list(self.blocks): - if getattr(block, "_used", 0): + if getattr(block, "_used", False): reserved_blocks.add(block) return reserved_blocks From c072bf6c0a1b7e7cd2eb41e2f7828a5b37eb6514 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 17 Apr 2023 15:12:30 -0400 Subject: [PATCH 31/34] implement __eq__ for BlockKey --- asdf/_tests/test_util.py | 3 +++ asdf/util.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/asdf/_tests/test_util.py b/asdf/_tests/test_util.py index 20b216663..17789e145 100644 --- a/asdf/_tests/test_util.py +++ b/asdf/_tests/test_util.py @@ -1,3 +1,4 @@ +import copy import io import pytest @@ -128,3 +129,5 @@ def test_block_key(): bk2 = util.BlockKey() d[bk2] = 2 assert len(d) == 2 + # check that equality and copying a key works + assert copy.copy(bk) == bk diff --git a/asdf/util.py b/asdf/util.py index 245001be2..61b971ef8 100644 --- a/asdf/util.py +++ b/asdf/util.py @@ -546,3 +546,6 @@ def __init__(self): def __hash__(self): return self._key + + def __eq__(self, other): + return self._key == other._key From 68458cd5127b8edd5b8567334e3ae02a0015fedf Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 24 Apr 2023 08:51:14 -0400 Subject: [PATCH 32/34] return NotImplemented for __eq__ with non-BlockKey --- asdf/_tests/test_util.py | 1 + asdf/util.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/asdf/_tests/test_util.py b/asdf/_tests/test_util.py index 17789e145..944631941 100644 --- a/asdf/_tests/test_util.py +++ b/asdf/_tests/test_util.py @@ -131,3 +131,4 @@ def test_block_key(): assert len(d) == 2 # check that equality and copying a key works assert copy.copy(bk) == bk + assert bk != hash(bk) diff --git a/asdf/util.py b/asdf/util.py index 61b971ef8..d812d1c0f 100644 --- a/asdf/util.py +++ b/asdf/util.py @@ -548,4 +548,6 @@ def __hash__(self): return self._key def __eq__(self, other): + if not isinstance(other, BlockKey): + return NotImplemented return self._key == other._key From abd6494b9ad6339aa5b13e1a1138a974b87cefd5 Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 24 Apr 2023 08:54:35 -0400 Subject: [PATCH 33/34] remove ctx arg from reserve_blocks --- asdf/_tests/test_block_converter.py | 4 ++-- asdf/block.py | 2 +- asdf/extension/_converter.py | 10 +++------- docs/asdf/extending/converters.rst | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/asdf/_tests/test_block_converter.py b/asdf/_tests/test_block_converter.py index 2156a43ca..4a58926b0 100644 --- a/asdf/_tests/test_block_converter.py +++ b/asdf/_tests/test_block_converter.py @@ -43,7 +43,7 @@ def from_yaml_tree(self, node, tag, ctx): ctx.assign_block_key(block_index, key2) return obj - def reserve_blocks(self, obj, tag, ctx): # Is there a ctx or tag at this point? + def reserve_blocks(self, obj, tag): if self._return_invalid_keys: # return something unhashable self._return_invalid_keys = False @@ -162,7 +162,7 @@ def from_yaml_tree(self, node, tag, ctx): ctx.assign_block_key(block_index, obj._asdf_key) return obj - def reserve_blocks(self, obj, tag, ctx): + def reserve_blocks(self, obj, tag): return [obj._asdf_key] diff --git a/asdf/block.py b/asdf/block.py index c4fd7e5f3..7f0873fa2 100644 --- a/asdf/block.py +++ b/asdf/block.py @@ -569,7 +569,7 @@ def _find_used_blocks(self, tree, ctx, remove=True): converter = ctx.extension_manager.get_converter_for_type(type(node)) sctx = ctx._create_serialization_context() tag = converter.select_tag(node, sctx) - for key in converter.reserve_blocks(node, tag, sctx): + for key in converter.reserve_blocks(node, tag): reserved_blocks.add(self.find_or_create_block(key)) else: hook = ctx._type_index.get_hook_for_type("reserve_blocks", type(node), ctx.version_string) diff --git a/asdf/extension/_converter.py b/asdf/extension/_converter.py index 319055254..f3b41bb83 100644 --- a/asdf/extension/_converter.py +++ b/asdf/extension/_converter.py @@ -150,7 +150,7 @@ def from_yaml_tree(self, node, tag, ctx): or a generator that yields such an instance. """ - def reserve_blocks(self, obj, tag, ctx): + def reserve_blocks(self, obj, tag): """ Reserve any number of blocks in which data (ndarrays) can be stored. @@ -165,8 +165,6 @@ def reserve_blocks(self, obj, tag, ctx): The tag identifying the YAML type that ``obj`` should be converted into. Selected by a call to this converter's select_tag method. - ctx : asdf.asdf.SerializationContext - The context of the current serialization request. Returns ------- @@ -303,7 +301,7 @@ def from_yaml_tree(self, node, tag, ctx): """ return self._delegate.from_yaml_tree(node, tag, ctx) - def reserve_blocks(self, obj, tag, ctx): + def reserve_blocks(self, obj, tag): """ Reserve blocks to be used during conversion of this object @@ -317,8 +315,6 @@ def reserve_blocks(self, obj, tag, ctx): The tag identifying the YAML type that ``obj`` should be converted into. Selected by a call to this converter's select_tag method. - ctx : asdf.asdf.SerializationContext - The context of the current serialization request. Returns ------- @@ -327,7 +323,7 @@ def reserve_blocks(self, obj, tag, ctx): """ if hasattr(self._delegate, "reserve_blocks"): - return self._delegate.reserve_blocks(obj, tag, ctx) + return self._delegate.reserve_blocks(obj, tag) return [] @property diff --git a/docs/asdf/extending/converters.rst b/docs/asdf/extending/converters.rst index 60198f2fc..f2f952fa6 100644 --- a/docs/asdf/extending/converters.rst +++ b/docs/asdf/extending/converters.rst @@ -341,7 +341,7 @@ A simple example of a Converter using block storage to store the ``payload`` for obj.payload = ctx.get_block_data_callback(block_index)() return obj - def reserve_blocks(self, obj, tag, ctx): + def reserve_blocks(self, obj, tag): return [obj._asdf_key] class BlockExtension(Extension): From 2c9b8e1c0c00f62cace5dcef01fa0bf86b570b1a Mon Sep 17 00:00:00 2001 From: Brett Date: Mon, 24 Apr 2023 08:58:31 -0400 Subject: [PATCH 34/34] shallow copy tree before write_to and deep copy key/values that will be modified --- asdf/asdf.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/asdf/asdf.py b/asdf/asdf.py index b9ea935d4..6e7ae6391 100644 --- a/asdf/asdf.py +++ b/asdf/asdf.py @@ -1362,12 +1362,14 @@ def write_to( ignore_unrecognized_tag=self._ignore_unrecognized_tag, ignore_implicit_conversion=self._ignore_implicit_conversion, ) - naf._tree = self.tree # avoid an extra validate - pre_write_tree = {} + naf._tree = copy.copy(self.tree) # avoid an extra validate + + # deep copy keys that will be modified during write modified_keys = ["history", "asdf_library"] for k in modified_keys: if k in self.tree: - pre_write_tree[k] = copy.deepcopy(self.tree[k]) + naf._tree[k] = copy.deepcopy(self.tree[k]) + # copy over block storage and other settings block_to_key_mapping = {v: k for k, v in self._blocks._key_to_block_mapping.items()} # this creates blocks in the new block manager that correspond to blocks @@ -1393,10 +1395,6 @@ def write_to( blk._used = True blk._data_callback = b._data_callback naf._write_to(fd, **kwargs) - for k in modified_keys: - if k in self._tree: - del self._tree[k] - self._tree.update(pre_write_tree) def _write_to( self,