diff --git a/asdf/tests/test_api.py b/asdf/tests/test_api.py new file mode 100644 index 000000000..093299a5c --- /dev/null +++ b/asdf/tests/test_api.py @@ -0,0 +1,316 @@ +# -*- coding: utf-8 -*- + +import os +import io + +import numpy as np +from numpy.testing import assert_array_equal +from astropy.modeling import models + +import pytest + +import asdf +from asdf import treeutil +from asdf import extension +from asdf import versioning +from asdf.exceptions import AsdfDeprecationWarning +from .helpers import assert_tree_match, assert_roundtrip_tree, display_warnings + + +def test_get_data_from_closed_file(tmpdir): + tmpdir = str(tmpdir) + path = os.path.join(tmpdir, 'test.asdf') + + my_array = np.arange(0, 64).reshape((8, 8)) + + tree = {'my_array': my_array} + ff = asdf.AsdfFile(tree) + ff.write_to(path) + + with asdf.open(path) as ff: + pass + + with pytest.raises(IOError): + assert_array_equal(my_array, ff.tree['my_array']) + + +def test_no_warning_nan_array(tmpdir): + """ + Tests for a regression that was introduced by + https://github.com/spacetelescope/asdf/pull/557 + """ + + tree = dict(array=np.array([1, 2, np.nan])) + + with pytest.warns(None) as w: + assert_roundtrip_tree(tree, tmpdir) + assert len(w) == 0, display_warnings(w) + + +def test_warning_deprecated_open(tmpdir): + + tmpfile = str(tmpdir.join('foo.asdf')) + + tree = dict(foo=42, bar='hello') + with asdf.AsdfFile(tree) as af: + af.write_to(tmpfile) + + with pytest.warns(AsdfDeprecationWarning): + with asdf.AsdfFile.open(tmpfile) as af: + assert_tree_match(tree, af.tree) + + +def test_open_readonly(tmpdir): + + tmpfile = str(tmpdir.join('readonly.asdf')) + + tree = dict(foo=42, bar='hello', baz=np.arange(20)) + with asdf.AsdfFile(tree) as af: + af.write_to(tmpfile, all_array_storage='internal') + + os.chmod(tmpfile, 0o440) + assert os.access(tmpfile, os.W_OK) == False + + with asdf.open(tmpfile) as af: + assert af['baz'].flags.writeable == False + + with pytest.raises(PermissionError): + with asdf.open(tmpfile, mode='rw'): + pass + + +def test_atomic_write(tmpdir, small_tree): + tmpfile = os.path.join(str(tmpdir), 'test.asdf') + + ff = asdf.AsdfFile(small_tree) + ff.write_to(tmpfile) + + with asdf.open(tmpfile, mode='r') as ff: + ff.write_to(tmpfile) + + +def test_overwrite(tmpdir): + # This is intended to reproduce the following issue: + # https://github.com/spacetelescope/asdf/issues/100 + tmpfile = os.path.join(str(tmpdir), 'test.asdf') + aff = models.AffineTransformation2D(matrix=[[1, 2], [3, 4]]) + f = asdf.AsdfFile() + f.tree['model'] = aff + f.write_to(tmpfile) + model = f.tree['model'] + + ff = asdf.AsdfFile() + ff.tree['model'] = model + ff.write_to(tmpfile) + + +def test_default_version(): + # See https://github.com/spacetelescope/asdf/issues/364 + + version_map = versioning.get_version_map(versioning.default_version) + + ff = asdf.AsdfFile() + assert ff.file_format_version == version_map['FILE_FORMAT'] + + +def test_update_exceptions(tmpdir): + tmpdir = str(tmpdir) + path = os.path.join(tmpdir, 'test.asdf') + + my_array = np.random.rand(8, 8) + tree = {'my_array': my_array} + ff = asdf.AsdfFile(tree) + ff.write_to(path) + + with asdf.open(path, mode='r', copy_arrays=True) as ff: + with pytest.raises(IOError): + ff.update() + + ff = asdf.AsdfFile(tree) + buff = io.BytesIO() + ff.write_to(buff) + + buff.seek(0) + with asdf.open(buff, mode='rw') as ff: + ff.update() + + with pytest.raises(ValueError): + asdf.AsdfFile().update() + + +def test_top_level_tree(small_tree): + tree = {'tree': small_tree} + ff = asdf.AsdfFile(tree) + assert_tree_match(ff.tree['tree'], ff['tree']) + + ff2 = asdf.AsdfFile() + ff2['tree'] = small_tree + assert_tree_match(ff2.tree['tree'], ff2['tree']) + + +def test_top_level_keys(small_tree): + tree = {'tree': small_tree} + ff = asdf.AsdfFile(tree) + assert ff.tree.keys() == ff.keys() + + +def test_walk_and_modify_remove_keys(): + tree = { + 'foo': 42, + 'bar': 43 + } + + def func(x): + if x == 42: + return None + return x + + tree2 = treeutil.walk_and_modify(tree, func) + + assert 'foo' not in tree2 + assert 'bar' in tree2 + + +def test_copy(tmpdir): + tmpdir = str(tmpdir) + + my_array = np.random.rand(8, 8) + tree = {'my_array': my_array, 'foo': {'bar': 'baz'}} + ff = asdf.AsdfFile(tree) + ff.write_to(os.path.join(tmpdir, 'test.asdf')) + + with asdf.open(os.path.join(tmpdir, 'test.asdf')) as ff: + ff2 = ff.copy() + ff2.tree['my_array'] *= 2 + ff2.tree['foo']['bar'] = 'boo' + + assert np.all(ff2.tree['my_array'] == + ff.tree['my_array'] * 2) + assert ff.tree['foo']['bar'] == 'baz' + + assert_array_equal(ff2.tree['my_array'], ff2.tree['my_array']) + + +def test_tag_to_schema_resolver_deprecation(): + ff = asdf.AsdfFile() + with pytest.warns(AsdfDeprecationWarning): + ff.tag_to_schema_resolver('foo') + + with pytest.warns(AsdfDeprecationWarning): + extension_list = extension.default_extensions.extension_list + extension_list.tag_to_schema_resolver('foo') + + +def test_access_tree_outside_handler(tmpdir): + tempname = str(tmpdir.join('test.asdf')) + + tree = {'random': np.random.random(10)} + + ff = asdf.AsdfFile(tree) + ff.write_to(str(tempname)) + + with asdf.open(tempname) as newf: + pass + + # Accessing array data outside of handler should fail + with pytest.raises(OSError): + newf.tree['random'][0] + + +def test_context_handler_resolve_and_inline(tmpdir): + # This reproduces the issue reported in + # https://github.com/spacetelescope/asdf/issues/406 + tempname = str(tmpdir.join('test.asdf')) + + tree = {'random': np.random.random(10)} + + ff = asdf.AsdfFile(tree) + ff.write_to(str(tempname)) + + with asdf.open(tempname) as newf: + newf.resolve_and_inline() + + with pytest.raises(OSError): + newf.tree['random'][0] + + +@pytest.mark.skip(reason='Until inline_threshold is added as a write option') +def test_inline_threshold(tmpdir): + + tree = { + 'small': np.ones(10), + 'large': np.ones(100) + } + + with asdf.AsdfFile(tree) as af: + assert len(list(af.blocks.inline_blocks)) == 1 + assert len(list(af.blocks.internal_blocks)) == 1 + + with asdf.AsdfFile(tree, inline_threshold=10) as af: + assert len(list(af.blocks.inline_blocks)) == 1 + assert len(list(af.blocks.internal_blocks)) == 1 + + with asdf.AsdfFile(tree, inline_threshold=5) as af: + assert len(list(af.blocks.inline_blocks)) == 0 + assert len(list(af.blocks.internal_blocks)) == 2 + + with asdf.AsdfFile(tree, inline_threshold=100) as af: + assert len(list(af.blocks.inline_blocks)) == 2 + assert len(list(af.blocks.internal_blocks)) == 0 + + +@pytest.mark.skip(reason='Until inline_threshold is added as a write option') +def test_inline_threshold_masked(tmpdir): + + mask = np.random.randint(0, 1+1, 20) + masked_array = np.ma.masked_array(np.ones(20), mask=mask) + + tree = { + 'masked': masked_array + } + + # Make sure that masked arrays aren't automatically inlined, even if they + # are small enough + with asdf.AsdfFile(tree) as af: + assert len(list(af.blocks.inline_blocks)) == 0 + assert len(list(af.blocks.internal_blocks)) == 2 + + tree = { + 'masked': masked_array, + 'normal': np.random.random(20) + } + + with asdf.AsdfFile(tree) as af: + assert len(list(af.blocks.inline_blocks)) == 1 + assert len(list(af.blocks.internal_blocks)) == 2 + + +@pytest.mark.skip(reason='Until inline_threshold is added as a write option') +def test_inline_threshold_override(tmpdir): + + tmpfile = str(tmpdir.join('inline.asdf')) + + tree = { + 'small': np.ones(10), + 'large': np.ones(100) + } + + with asdf.AsdfFile(tree) as af: + af.set_array_storage(tree['small'], 'internal') + assert len(list(af.blocks.inline_blocks)) == 0 + assert len(list(af.blocks.internal_blocks)) == 2 + + with asdf.AsdfFile(tree) as af: + af.set_array_storage(tree['large'], 'inline') + assert len(list(af.blocks.inline_blocks)) == 2 + assert len(list(af.blocks.internal_blocks)) == 0 + + with asdf.AsdfFile(tree) as af: + af.write_to(tmpfile, all_array_storage='internal') + assert len(list(af.blocks.inline_blocks)) == 0 + assert len(list(af.blocks.internal_blocks)) == 2 + + with asdf.AsdfFile(tree) as af: + af.write_to(tmpfile, all_array_storage='inline') + assert len(list(af.blocks.inline_blocks)) == 2 + assert len(list(af.blocks.internal_blocks)) == 0 diff --git a/asdf/tests/test_low_level.py b/asdf/tests/test_array_blocks.py similarity index 63% rename from asdf/tests/test_low_level.py rename to asdf/tests/test_array_blocks.py index 5fbf72daf..a661cd8b2 100644 --- a/asdf/tests/test_low_level.py +++ b/asdf/tests/test_array_blocks.py @@ -5,219 +5,13 @@ import numpy as np from numpy.testing import assert_array_equal -from astropy.modeling import models import pytest import asdf from asdf import block from asdf import constants -from asdf import extension from asdf import generic_io -from asdf import treeutil -from asdf import versioning -from asdf.exceptions import AsdfDeprecationWarning - -from ..tests.helpers import (assert_tree_match, assert_roundtrip_tree, - display_warnings) - - -def test_no_yaml_end_marker(tmpdir): - content = b"""#ASDF 1.0.0 -%YAML 1.1 -%TAG ! tag:stsci.edu:asdf/ ---- !core/asdf-1.0.0 -foo: bar...baz -baz: 42 - """ - path = os.path.join(str(tmpdir), 'test.asdf') - - buff = io.BytesIO(content) - with pytest.raises(ValueError): - with asdf.open(buff): - pass - - buff.seek(0) - fd = generic_io.InputStream(buff, 'r') - with pytest.raises(ValueError): - with asdf.open(fd): - pass - - with open(path, 'wb') as fd: - fd.write(content) - - with open(path, 'rb') as fd: - with pytest.raises(ValueError): - with asdf.open(fd): - pass - - -def test_no_final_newline(tmpdir): - content = b"""#ASDF 1.0.0 -%YAML 1.1 -%TAG ! tag:stsci.edu:asdf/ ---- !core/asdf-1.0.0 -foo: ...bar... -baz: 42 -...""" - path = os.path.join(str(tmpdir), 'test.asdf') - - buff = io.BytesIO(content) - with asdf.open(buff) as ff: - assert len(ff.tree) == 2 - - buff.seek(0) - fd = generic_io.InputStream(buff, 'r') - with asdf.open(fd) as ff: - assert len(ff.tree) == 2 - - with open(path, 'wb') as fd: - fd.write(content) - - with open(path, 'rb') as fd: - with asdf.open(fd) as ff: - assert len(ff.tree) == 2 - - -def test_no_asdf_header(tmpdir): - content = b"What? This ain't no ASDF file" - - path = os.path.join(str(tmpdir), 'test.asdf') - - buff = io.BytesIO(content) - with pytest.raises(ValueError): - asdf.open(buff) - - with open(path, 'wb') as fd: - fd.write(content) - - with open(path, 'rb') as fd: - with pytest.raises(ValueError): - asdf.open(fd) - - -def test_no_asdf_blocks(tmpdir): - content = b"""#ASDF 1.0.0 -%YAML 1.1 -%TAG ! tag:stsci.edu:asdf/ ---- !core/asdf-1.0.0 -foo: bar -... -XXXXXXXX - """ - - path = os.path.join(str(tmpdir), 'test.asdf') - - buff = io.BytesIO(content) - with asdf.open(buff) as ff: - assert len(ff.blocks) == 0 - - buff.seek(0) - fd = generic_io.InputStream(buff, 'r') - with asdf.open(fd) as ff: - assert len(ff.blocks) == 0 - - with open(path, 'wb') as fd: - fd.write(content) - - with open(path, 'rb') as fd: - with asdf.open(fd) as ff: - assert len(ff.blocks) == 0 - - -def test_invalid_source(small_tree): - buff = io.BytesIO() - - ff = asdf.AsdfFile(small_tree) - # Since we're testing with small arrays, force all arrays to be stored - # in internal blocks rather than letting some of them be automatically put - # inline. - ff.write_to(buff, all_array_storage='internal') - - buff.seek(0) - with asdf.open(buff) as ff2: - ff2.blocks.get_block(0) - - with pytest.raises(ValueError): - ff2.blocks.get_block(2) - - with pytest.raises(IOError): - ff2.blocks.get_block("http://127.0.0.1/") - - with pytest.raises(TypeError): - ff2.blocks.get_block(42.0) - - with pytest.raises(ValueError): - ff2.blocks.get_source(42.0) - - block = ff2.blocks.get_block(0) - assert ff2.blocks.get_source(block) == 0 - - -def test_empty_file(): - buff = io.BytesIO(b"#ASDF 1.0.0\n") - buff.seek(0) - - with asdf.open(buff) as ff: - assert ff.tree == {} - assert len(ff.blocks) == 0 - - buff = io.BytesIO(b"#ASDF 1.0.0\n#ASDF_STANDARD 1.0.0") - buff.seek(0) - - with asdf.open(buff) as ff: - assert ff.tree == {} - assert len(ff.blocks) == 0 - - -def test_not_asdf_file(): - buff = io.BytesIO(b"SIMPLE") - buff.seek(0) - - with pytest.raises(ValueError): - with asdf.open(buff): - pass - - buff = io.BytesIO(b"SIMPLE\n") - buff.seek(0) - - with pytest.raises(ValueError): - with asdf.open(buff): - pass - - -def test_junk_file(): - buff = io.BytesIO(b"#ASDF 1.0.0\nFOO") - buff.seek(0) - - with pytest.raises(ValueError): - with asdf.open(buff): - pass - - -def test_block_mismatch(): - # This is a file with a single small block, followed by something - # that has an invalid block magic number. - - buff = io.BytesIO( - b'#ASDF 1.0.0\n\xd3BLK\x00\x28\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0FOOBAR') - - buff.seek(0) - with pytest.raises(ValueError): - with asdf.open(buff): - pass - - -def test_block_header_too_small(): - # The block header size must be at least 40 - - buff = io.BytesIO( - b'#ASDF 1.0.0\n\xd3BLK\0\0') - - buff.seek(0) - with pytest.raises(ValueError): - with asdf.open(buff): - pass def test_external_block(tmpdir): @@ -638,48 +432,6 @@ def test_init_from_asdffile(tmpdir): assert_array_equal(ff.tree['my_array'], my_array) -def test_update_exceptions(tmpdir): - tmpdir = str(tmpdir) - path = os.path.join(tmpdir, 'test.asdf') - - my_array = np.random.rand(8, 8) - tree = {'my_array': my_array} - ff = asdf.AsdfFile(tree) - ff.write_to(path) - - with asdf.open(path, mode='r', copy_arrays=True) as ff: - with pytest.raises(IOError): - ff.update() - - ff = asdf.AsdfFile(tree) - buff = io.BytesIO() - ff.write_to(buff) - - buff.seek(0) - with asdf.open(buff, mode='rw') as ff: - ff.update() - - with pytest.raises(ValueError): - asdf.AsdfFile().update() - - -def test_get_data_from_closed_file(tmpdir): - tmpdir = str(tmpdir) - path = os.path.join(tmpdir, 'test.asdf') - - my_array = np.arange(0, 64).reshape((8, 8)) - - tree = {'my_array': my_array} - ff = asdf.AsdfFile(tree) - ff.write_to(path) - - with asdf.open(path) as ff: - pass - - with pytest.raises(IOError): - assert_array_equal(my_array, ff.tree['my_array']) - - def test_seek_until_on_block_boundary(): # Create content where the first block begins on a # file-reading-block boundary. @@ -740,68 +492,6 @@ def test_checksum_update(tmpdir): b'T\xaf~[\x90\x8a\x88^\xc2B\x96D,N\xadL' -def test_atomic_write(tmpdir, small_tree): - tmpfile = os.path.join(str(tmpdir), 'test.asdf') - - ff = asdf.AsdfFile(small_tree) - ff.write_to(tmpfile) - - with asdf.open(tmpfile, mode='r') as ff: - ff.write_to(tmpfile) - - -def test_overwrite(tmpdir): - # This is intended to reproduce the following issue: - # https://github.com/spacetelescope/asdf/issues/100 - tmpfile = os.path.join(str(tmpdir), 'test.asdf') - aff = models.AffineTransformation2D(matrix=[[1, 2], [3, 4]]) - f = asdf.AsdfFile() - f.tree['model'] = aff - f.write_to(tmpfile) - model = f.tree['model'] - - ff = asdf.AsdfFile() - ff.tree['model'] = model - ff.write_to(tmpfile) - - -def test_walk_and_modify_remove_keys(): - tree = { - 'foo': 42, - 'bar': 43 - } - - def func(x): - if x == 42: - return None - return x - - tree2 = treeutil.walk_and_modify(tree, func) - - assert 'foo' not in tree2 - assert 'bar' in tree2 - - -def test_copy(tmpdir): - tmpdir = str(tmpdir) - - my_array = np.random.rand(8, 8) - tree = {'my_array': my_array, 'foo': {'bar': 'baz'}} - ff = asdf.AsdfFile(tree) - ff.write_to(os.path.join(tmpdir, 'test.asdf')) - - with asdf.open(os.path.join(tmpdir, 'test.asdf')) as ff: - ff2 = ff.copy() - ff2.tree['my_array'] *= 2 - ff2.tree['foo']['bar'] = 'boo' - - assert np.all(ff2.tree['my_array'] == - ff.tree['my_array'] * 2) - assert ff.tree['foo']['bar'] == 'baz' - - assert_array_equal(ff2.tree['my_array'], ff2.tree['my_array']) - - def test_deferred_block_loading(small_tree): buff = io.BytesIO() @@ -1101,44 +791,6 @@ def test_open_no_memmap(tmpdir): assert not isinstance(array.block._data, np.memmap) -def test_invalid_version(tmpdir): - content = b"""#ASDF 0.1.0 -%YAML 1.1 -%TAG ! tag:stsci.edu:asdf/ ---- !core/asdf-0.1.0 -foo : bar -...""" - buff = io.BytesIO(content) - with pytest.raises(ValueError): - with asdf.open(buff) as ff: - pass - - -def test_valid_version(tmpdir): - content = b"""#ASDF 1.0.0 -%YAML 1.1 -%TAG ! tag:stsci.edu:asdf/ ---- !core/asdf-1.0.0 -foo : bar -...""" - buff = io.BytesIO(content) - with asdf.open(buff) as ff: - version = ff.file_format_version - - assert version.major == 1 - assert version.minor == 0 - assert version.patch == 0 - - -def test_default_version(): - # See https://github.com/spacetelescope/asdf/issues/364 - - version_map = versioning.get_version_map(versioning.default_version) - - ff = asdf.AsdfFile() - assert ff.file_format_version == version_map['FILE_FORMAT'] - - def test_fd_not_seekable(): data = np.ones(1024) b = block.Block(data=data) @@ -1155,189 +807,3 @@ def test_fd_not_seekable(): # We lost the information about the underlying array type, # but still can compare the bytes. assert b.data.tobytes() == data.tobytes() - - -def test_top_level_tree(small_tree): - tree = {'tree': small_tree} - ff = asdf.AsdfFile(tree) - assert_tree_match(ff.tree['tree'], ff['tree']) - - ff2 = asdf.AsdfFile() - ff2['tree'] = small_tree - assert_tree_match(ff2.tree['tree'], ff2['tree']) - - -def test_top_level_keys(small_tree): - tree = {'tree': small_tree} - ff = asdf.AsdfFile(tree) - assert ff.tree.keys() == ff.keys() - - -def test_tag_to_schema_resolver_deprecation(): - ff = asdf.AsdfFile() - with pytest.warns(AsdfDeprecationWarning): - ff.tag_to_schema_resolver('foo') - - with pytest.warns(AsdfDeprecationWarning): - extension_list = extension.default_extensions.extension_list - extension_list.tag_to_schema_resolver('foo') - - -def test_access_tree_outside_handler(tmpdir): - tempname = str(tmpdir.join('test.asdf')) - - tree = {'random': np.random.random(10)} - - ff = asdf.AsdfFile(tree) - ff.write_to(str(tempname)) - - with asdf.open(tempname) as newf: - pass - - # Accessing array data outside of handler should fail - with pytest.raises(OSError): - newf.tree['random'][0] - - -def test_context_handler_resolve_and_inline(tmpdir): - # This reproduces the issue reported in - # https://github.com/spacetelescope/asdf/issues/406 - tempname = str(tmpdir.join('test.asdf')) - - tree = {'random': np.random.random(10)} - - ff = asdf.AsdfFile(tree) - ff.write_to(str(tempname)) - - with asdf.open(tempname) as newf: - newf.resolve_and_inline() - - with pytest.raises(OSError): - newf.tree['random'][0] - - -@pytest.mark.skip(reason='Until inline_threshold is added as a write option') -def test_inline_threshold(tmpdir): - - tree = { - 'small': np.ones(10), - 'large': np.ones(100) - } - - with asdf.AsdfFile(tree) as af: - assert len(list(af.blocks.inline_blocks)) == 1 - assert len(list(af.blocks.internal_blocks)) == 1 - - with asdf.AsdfFile(tree, inline_threshold=10) as af: - assert len(list(af.blocks.inline_blocks)) == 1 - assert len(list(af.blocks.internal_blocks)) == 1 - - with asdf.AsdfFile(tree, inline_threshold=5) as af: - assert len(list(af.blocks.inline_blocks)) == 0 - assert len(list(af.blocks.internal_blocks)) == 2 - - with asdf.AsdfFile(tree, inline_threshold=100) as af: - assert len(list(af.blocks.inline_blocks)) == 2 - assert len(list(af.blocks.internal_blocks)) == 0 - - -@pytest.mark.skip(reason='Until inline_threshold is added as a write option') -def test_inline_threshold_masked(tmpdir): - - mask = np.random.randint(0, 1+1, 20) - masked_array = np.ma.masked_array(np.ones(20), mask=mask) - - tree = { - 'masked': masked_array - } - - # Make sure that masked arrays aren't automatically inlined, even if they - # are small enough - with asdf.AsdfFile(tree) as af: - assert len(list(af.blocks.inline_blocks)) == 0 - assert len(list(af.blocks.internal_blocks)) == 2 - - tree = { - 'masked': masked_array, - 'normal': np.random.random(20) - } - - with asdf.AsdfFile(tree) as af: - assert len(list(af.blocks.inline_blocks)) == 1 - assert len(list(af.blocks.internal_blocks)) == 2 - - -@pytest.mark.skip(reason='Until inline_threshold is added as a write option') -def test_inline_threshold_override(tmpdir): - - tmpfile = str(tmpdir.join('inline.asdf')) - - tree = { - 'small': np.ones(10), - 'large': np.ones(100) - } - - with asdf.AsdfFile(tree) as af: - af.set_array_storage(tree['small'], 'internal') - assert len(list(af.blocks.inline_blocks)) == 0 - assert len(list(af.blocks.internal_blocks)) == 2 - - with asdf.AsdfFile(tree) as af: - af.set_array_storage(tree['large'], 'inline') - assert len(list(af.blocks.inline_blocks)) == 2 - assert len(list(af.blocks.internal_blocks)) == 0 - - with asdf.AsdfFile(tree) as af: - af.write_to(tmpfile, all_array_storage='internal') - assert len(list(af.blocks.inline_blocks)) == 0 - assert len(list(af.blocks.internal_blocks)) == 2 - - with asdf.AsdfFile(tree) as af: - af.write_to(tmpfile, all_array_storage='inline') - assert len(list(af.blocks.inline_blocks)) == 2 - assert len(list(af.blocks.internal_blocks)) == 0 - - -def test_no_warning_nan_array(tmpdir): - """ - Tests for a regression that was introduced by - https://github.com/spacetelescope/asdf/pull/557 - """ - - tree = dict(array=np.array([1, 2, np.nan])) - - with pytest.warns(None) as w: - assert_roundtrip_tree(tree, tmpdir) - assert len(w) == 0, display_warnings(w) - - -def test_warning_deprecated_open(tmpdir): - - tmpfile = str(tmpdir.join('foo.asdf')) - - tree = dict(foo=42, bar='hello') - with asdf.AsdfFile(tree) as af: - af.write_to(tmpfile) - - with pytest.warns(AsdfDeprecationWarning): - with asdf.AsdfFile.open(tmpfile) as af: - assert_tree_match(tree, af.tree) - - -def test_open_readonly(tmpdir): - - tmpfile = str(tmpdir.join('readonly.asdf')) - - tree = dict(foo=42, bar='hello', baz=np.arange(20)) - with asdf.AsdfFile(tree) as af: - af.write_to(tmpfile, all_array_storage='internal') - - os.chmod(tmpfile, 0o440) - assert os.access(tmpfile, os.W_OK) == False - - with asdf.open(tmpfile) as af: - assert af['baz'].flags.writeable == False - - with pytest.raises(PermissionError): - with asdf.open(tmpfile, mode='rw'): - pass diff --git a/asdf/tests/test_file_format.py b/asdf/tests/test_file_format.py new file mode 100644 index 000000000..1fc0fd579 --- /dev/null +++ b/asdf/tests/test_file_format.py @@ -0,0 +1,236 @@ +# -*- coding: utf-8 -*- + +import os +import io + +import pytest + +import asdf +from asdf import generic_io + + +def test_no_yaml_end_marker(tmpdir): + content = b"""#ASDF 1.0.0 +%YAML 1.1 +%TAG ! tag:stsci.edu:asdf/ +--- !core/asdf-1.0.0 +foo: bar...baz +baz: 42 + """ + path = os.path.join(str(tmpdir), 'test.asdf') + + buff = io.BytesIO(content) + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + buff.seek(0) + fd = generic_io.InputStream(buff, 'r') + with pytest.raises(ValueError): + with asdf.open(fd): + pass + + with open(path, 'wb') as fd: + fd.write(content) + + with open(path, 'rb') as fd: + with pytest.raises(ValueError): + with asdf.open(fd): + pass + + +def test_no_final_newline(tmpdir): + content = b"""#ASDF 1.0.0 +%YAML 1.1 +%TAG ! tag:stsci.edu:asdf/ +--- !core/asdf-1.0.0 +foo: ...bar... +baz: 42 +...""" + path = os.path.join(str(tmpdir), 'test.asdf') + + buff = io.BytesIO(content) + with asdf.open(buff) as ff: + assert len(ff.tree) == 2 + + buff.seek(0) + fd = generic_io.InputStream(buff, 'r') + with asdf.open(fd) as ff: + assert len(ff.tree) == 2 + + with open(path, 'wb') as fd: + fd.write(content) + + with open(path, 'rb') as fd: + with asdf.open(fd) as ff: + assert len(ff.tree) == 2 + + +def test_no_asdf_header(tmpdir): + content = b"What? This ain't no ASDF file" + + path = os.path.join(str(tmpdir), 'test.asdf') + + buff = io.BytesIO(content) + with pytest.raises(ValueError): + asdf.open(buff) + + with open(path, 'wb') as fd: + fd.write(content) + + with open(path, 'rb') as fd: + with pytest.raises(ValueError): + asdf.open(fd) + + +def test_no_asdf_blocks(tmpdir): + content = b"""#ASDF 1.0.0 +%YAML 1.1 +%TAG ! tag:stsci.edu:asdf/ +--- !core/asdf-1.0.0 +foo: bar +... +XXXXXXXX + """ + + path = os.path.join(str(tmpdir), 'test.asdf') + + buff = io.BytesIO(content) + with asdf.open(buff) as ff: + assert len(ff.blocks) == 0 + + buff.seek(0) + fd = generic_io.InputStream(buff, 'r') + with asdf.open(fd) as ff: + assert len(ff.blocks) == 0 + + with open(path, 'wb') as fd: + fd.write(content) + + with open(path, 'rb') as fd: + with asdf.open(fd) as ff: + assert len(ff.blocks) == 0 + + +def test_invalid_source(small_tree): + buff = io.BytesIO() + + ff = asdf.AsdfFile(small_tree) + # Since we're testing with small arrays, force all arrays to be stored + # in internal blocks rather than letting some of them be automatically put + # inline. + ff.write_to(buff, all_array_storage='internal') + + buff.seek(0) + with asdf.open(buff) as ff2: + ff2.blocks.get_block(0) + + with pytest.raises(ValueError): + ff2.blocks.get_block(2) + + with pytest.raises(IOError): + ff2.blocks.get_block("http://127.0.0.1/") + + with pytest.raises(TypeError): + ff2.blocks.get_block(42.0) + + with pytest.raises(ValueError): + ff2.blocks.get_source(42.0) + + block = ff2.blocks.get_block(0) + assert ff2.blocks.get_source(block) == 0 + + +def test_empty_file(): + buff = io.BytesIO(b"#ASDF 1.0.0\n") + buff.seek(0) + + with asdf.open(buff) as ff: + assert ff.tree == {} + assert len(ff.blocks) == 0 + + buff = io.BytesIO(b"#ASDF 1.0.0\n#ASDF_STANDARD 1.0.0") + buff.seek(0) + + with asdf.open(buff) as ff: + assert ff.tree == {} + assert len(ff.blocks) == 0 + + +def test_not_asdf_file(): + buff = io.BytesIO(b"SIMPLE") + buff.seek(0) + + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + buff = io.BytesIO(b"SIMPLE\n") + buff.seek(0) + + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + +def test_junk_file(): + buff = io.BytesIO(b"#ASDF 1.0.0\nFOO") + buff.seek(0) + + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + +def test_block_mismatch(): + # This is a file with a single small block, followed by something + # that has an invalid block magic number. + + buff = io.BytesIO( + b'#ASDF 1.0.0\n\xd3BLK\x00\x28\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0FOOBAR') + + buff.seek(0) + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + +def test_block_header_too_small(): + # The block header size must be at least 40 + + buff = io.BytesIO( + b'#ASDF 1.0.0\n\xd3BLK\0\0') + + buff.seek(0) + with pytest.raises(ValueError): + with asdf.open(buff): + pass + + +def test_invalid_version(tmpdir): + content = b"""#ASDF 0.1.0 +%YAML 1.1 +%TAG ! tag:stsci.edu:asdf/ +--- !core/asdf-0.1.0 +foo : bar +...""" + buff = io.BytesIO(content) + with pytest.raises(ValueError): + with asdf.open(buff) as ff: + pass + + +def test_valid_version(tmpdir): + content = b"""#ASDF 1.0.0 +%YAML 1.1 +%TAG ! tag:stsci.edu:asdf/ +--- !core/asdf-1.0.0 +foo : bar +...""" + buff = io.BytesIO(content) + with asdf.open(buff) as ff: + version = ff.file_format_version + + assert version.major == 1 + assert version.minor == 0 + assert version.patch == 0