Skip to content

Commit

Permalink
HDUList support for identify_[tw]*_fits; default writers for HDU #
Browse files Browse the repository at this point in the history
  • Loading branch information
dhomeier committed Aug 18, 2020
1 parent 849e69f commit 560b120
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 27 deletions.
59 changes: 40 additions & 19 deletions specutils/io/default_loaders/tabular_fits.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import _io

import numpy as np

Expand All @@ -18,28 +19,41 @@

def identify_tabular_fits(origin, *args, **kwargs):
# args[0] = filename
# check if filename conforms to naming convention for writer
hdu = kwargs.get('hdu', 1)
# Check if filename conforms to naming convention and writer has not been
# asked to write to primary (IMAGE-only) HDU
if origin == 'write':
return args[0].endswith(('.fits', '.fit'))

# check if file can be opened with this reader
with fits.open(args[0]) as hdulist:
# Test if fits has extension of type BinTable and check against
# known keys of already defined specific formats
return (len(hdulist) > 1 and
isinstance(hdulist[1], fits.BinTableHDU) and not
(fits.getheader(args[0]).get('TELESCOP') == 'MULTI' and
fits.getheader(args[0]).get('HLSPACRN') == 'MUSCLES' and
fits.getheader(args[0]).get('PROPOSID') == 13650) and not
(fits.getheader(args[0]).get('TELESCOP') == 'SDSS 2.5-M' and
fits.getheader(args[0]).get('FIBERID') > 0) and not
(fits.getheader(args[0]).get('TELESCOP') == 'HST' and
fits.getheader(args[0]).get('INSTRUME') in ('COS', 'STIS')) and not
fits.getheader(args[0]).get('TELESCOP') == 'JWST')
return (hdu > 0 and args[0].endswith(('.fits', '.fit')) and not
args[0].endswith(('wcs.fits', 'wcs1d.fits', 'wcs.fit')))

if isinstance(args[2], fits.hdu.hdulist.HDUList):
hdulist = args[2]
elif isinstance(args[2], _io.BufferedReader):
hdulist = fits.open(args[2])
else: # if isinstance(args[2], _io.FileIO):
hdulist = fits.open(args[0], **kwargs)

# Test if fits has extension of type BinTable and check against
# known keys of already defined specific formats
is_tab = (len(hdulist) > 1 and
isinstance(hdulist[hdu], fits.BinTableHDU) and not
(hdulist[0].header.get('TELESCOP') == 'MULTI' and
hdulist[0].header.get('HLSPACRN') == 'MUSCLES' and
hdulist[0].header.get('PROPOSID') == 13650) and not
(hdulist[0].header.get('TELESCOP') == 'SDSS 2.5-M' and
hdulist[0].header.get('FIBERID') > 0) and not
(hdulist[0].header.get('TELESCOP') == 'HST' and
hdulist[0].header.get('INSTRUME') in ('COS', 'STIS')) and not
hdulist[0].header.get('TELESCOP') == 'JWST')

if not isinstance(args[2], (fits.hdu.hdulist.HDUList, _io.BufferedReader)):
hdulist.close()

return is_tab


@data_loader("tabular-fits", identifier=identify_tabular_fits,
dtype=Spectrum1D, extensions=['fits'], priority=6)
dtype=Spectrum1D, extensions=['fits', 'fit'], priority=6)
def tabular_fits_loader(file_obj, column_mapping=None, hdu=1, **kwargs):
"""
Load spectrum from a FITS file.
Expand Down Expand Up @@ -92,7 +106,7 @@ def tabular_fits_loader(file_obj, column_mapping=None, hdu=1, **kwargs):


@custom_writer("tabular-fits")
def tabular_fits_writer(spectrum, file_name, update_header=False, **kwargs):
def tabular_fits_writer(spectrum, file_name, hdu=1, update_header=False, **kwargs):
"""
Write spectrum to BINTABLE extension of a FITS file.
Expand All @@ -101,6 +115,8 @@ def tabular_fits_writer(spectrum, file_name, update_header=False, **kwargs):
spectrum: Spectrum1D
file_name: str
The path to the FITS file
hdu: int
Header Data Unit in FITS file to write to (currently only extension HDU 1)
update_header: bool
Update FITS header with all compatible entries in `spectrum.meta`
wunit : str or `~astropy.units.Unit`
Expand All @@ -112,6 +128,9 @@ def tabular_fits_writer(spectrum, file_name, update_header=False, **kwargs):
ftype : str or `~numpy.dtype`
Floating point type for storing flux array
"""
if hdu < 1:
raise ValueError(f'FITS does not support BINTABLE extension in HDU {hdu}.')

header = spectrum.meta.get('header', fits.header.Header()).copy()

if update_header:
Expand Down Expand Up @@ -163,4 +182,6 @@ def tabular_fits_writer(spectrum, file_name, update_header=False, **kwargs):

tab = Table(columns, names=colnames, meta=header)

# Todo: support writing to other HDUs than the default (1st)
# and an 'update' mode so different HDUs can be written to separately
tab.write(file_name, format="fits", **kwargs)
26 changes: 19 additions & 7 deletions specutils/io/default_loaders/wcs_fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@ def identify_wcs1d_fits(origin, *args, **kwargs):
specified (default primary) HDU. This is used for Astropy I/O Registry.
On writing check if filename conforms to naming convention for this format.
"""
whdu = kwargs.get('hdu', 1)
# Default FITS format is BINTABLE in 1st extension HDU, unless IMAGE is
# indicated via naming pattern or (explicitly) selecting primary HDU.
if origin == 'write':
return (args[0].endswith(('wcs.fits', 'wcs1d.fits', 'wcs.fit')) and not
return ((args[0].endswith(('wcs.fits', 'wcs1d.fits', 'wcs.fit')) or
(args[0].endswith(('.fits', '.fit')) and whdu == 0)) and not
hasattr(args[2], 'uncertainty'))

hdu = kwargs.get('hdu', 0)
if isinstance(args[2], fits.hdu.hdulist.HDUList):
hdulist = args[2]
elif isinstance(args[2], _io.BufferedReader):
hdulist = fits.open(args[2])
else: # if isinstance(args[2], _io.FileIO):
hdulist = fits.open(args[0], **kwargs)
hdu = kwargs.get('hdu', 0)

# Check if number of axes is one and dimension of WCS is one
is_wcs = (hdulist[hdu].header['NAXIS'] == 1 and
hdulist[hdu].header.get('WCSDIM', 1) == 1 and not
is_wcs = (hdulist[hdu].header.get('WCSDIM', 1) == 1 and
(hdulist[hdu].header['NAXIS'] == 1 or
hdulist[hdu].header.get('WCSAXES', 0) == 1 )and not
hdulist[hdu].header.get('MSTITLE', 'n').startswith('2dF-SDSS LRG') and not
# Check in CTYPE1 key for linear solution (single spectral axis)
hdulist[hdu].header.get('CTYPE1', 'w').upper().startswith('MULTISPE'))
Expand All @@ -48,7 +53,7 @@ def identify_wcs1d_fits(origin, *args, **kwargs):


@data_loader("wcs1d-fits", identifier=identify_wcs1d_fits,
dtype=Spectrum1D, extensions=['fits'], priority=5)
dtype=Spectrum1D, extensions=['fits', 'fit'], priority=5)
def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None,
hdu=0, **kwargs):
"""
Expand All @@ -64,7 +69,7 @@ def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None,
or HDUList (as resulting from astropy.io.fits.open()).
spectral_axis_unit: str or `~astropy.Unit`, optional
Units of the spectral axis. If not given (or None), the unit will be
inferred from the CUNIT in the WCS. Note that if this is provided it
inferred from the CUNIT in the WCS. Note that if this is provided it
will *override* any units the CUNIT provides.
flux_unit: str or `~astropy.Unit`, optional
Units of the flux for this spectrum. If not given (or None), the unit
Expand Down Expand Up @@ -115,7 +120,7 @@ def wcs1d_fits_loader(file_obj, spectral_axis_unit=None, flux_unit=None,


@custom_writer("wcs1d-fits")
def wcs1d_fits_writer(spectrum, file_name, update_header=False, **kwargs):
def wcs1d_fits_writer(spectrum, file_name, hdu=0, update_header=False, **kwargs):
"""
Write spectrum with spectral axis defined by its WCS to (primary)
IMAGE_HDU of a FITS file.
Expand All @@ -125,6 +130,8 @@ def wcs1d_fits_writer(spectrum, file_name, update_header=False, **kwargs):
spectrum: Spectrum1D
file_name: str
The path to the FITS file
hdu: int
Header Data Unit in FITS file to write to (base 0; default primary HDU)
update_header: bool
Update FITS header with all compatible entries in `spectrum.meta`
unit : str or `~astropy.units.Unit`
Expand Down Expand Up @@ -173,6 +180,11 @@ def wcs1d_fits_writer(spectrum, file_name, update_header=False, **kwargs):
comment = f'[{funit.to_string()}] {funit.physical_type}'
header.insert('CRPIX1', card=('BUNIT', f'{funit}', comment))

# If hdu > 0 selected, prepend empty HDUs
# Todo: implement `update` mode to write to existing files
while len(hdulist) < hdu + 1:
hdulist.insert(0, fits.ImageHDU())

hdulist.writeto(file_name, **kwargs)


Expand Down
62 changes: 61 additions & 1 deletion specutils/tests/test_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,36 @@ def test_tabular_fits_header(tmpdir):
hdulist.close()


def test_tabular_fits_autowrite(tmpdir):
"""Test writing of Spectrum1D with automatic selection of BINTABLE format."""
disp = np.linspace(1, 1.2, 21) * u.AA
flux = np.random.normal(0., 1.0e-14, disp.shape[0]) * u.W / (u.m**2 * u.AA)
hdr = fits.header.Header({'TELESCOP': 'Leviathan', 'APERTURE': 1.8,
'OBSERVER': 'Parsons', 'NAXIS': 1, 'NAXIS1': 8})

spectrum = Spectrum1D(flux=flux, spectral_axis=disp, meta={'header': hdr})
tmpfile = str(tmpdir.join('_tst.fits'))
spectrum.write(tmpfile)

# Read it in and check against the original
with fits.open(tmpfile) as hdulist:
assert hdulist[0].header['NAXIS'] == 0
assert hdulist[1].header['NAXIS'] == 2
assert hdulist[1].header['NAXIS2'] == disp.shape[0]

# Trigger exception for illegal HDU (primary HDU only accepts IMAGE_HDU)
with pytest.raises(ValueError, match=r'FITS does not support BINTABLE'):
spectrum.write(tmpfile, format='tabular-fits', overwrite=True, hdu=0)

# Test automatic selection of wcs1d format, which will fail without suitable wcs
with pytest.raises(ValueError, match=r'Only Spectrum1D objects with valid WCS'):
spectrum.write(tmpfile, overwrite=True, hdu=0)

tmpfile = str(tmpdir.join('_wcs.fits'))
with pytest.raises(ValueError, match=r'Only Spectrum1D objects with valid WCS'):
spectrum.write(tmpfile, overwrite=True)


@pytest.mark.parametrize("spectral_axis",
['wavelength', 'frequency', 'energy', 'wavenumber'])
def test_wcs1d_fits_writer(tmpdir, spectral_axis):
Expand All @@ -472,7 +502,7 @@ def test_wcs1d_fits_writer(tmpdir, spectral_axis):

spectrum = Spectrum1D(flux=flux, wcs=WCS(hdr))
tmpfile = str(tmpdir.join('_tst.fits'))
spectrum.write(tmpfile, format='wcs1d-fits')
spectrum.write(tmpfile, hdu=0)

# Read it in and check against the original
spec = Spectrum1D.read(tmpfile)
Expand All @@ -490,6 +520,36 @@ def test_wcs1d_fits_writer(tmpdir, spectral_axis):
assert quantity_allclose(spec.flux, spectrum.flux)


@pytest.mark.parametrize("hdu", range(3))
def test_wcs1d_fits_hdus(tmpdir, hdu):
"""Test writing of Spectrum1D in WCS1D format to different IMAGE_HDUs."""
# Header dictionary for constructing WCS
hdr = {'CTYPE1': 'wavelength', 'CUNIT1': 'um',
'CRPIX1': 1, 'CRVAL1': 1, 'CDELT1': 0.01}
# Create a small data set
flu = u.W / (u.m**2 * u.nm)
flux = np.arange(1, 11)**2 * 1.e-14 * flu

spectrum = Spectrum1D(flux=flux, wcs=WCS(hdr))
tmpfile = str(tmpdir.join('_tst.fits'))
spectrum.write(tmpfile, hdu=hdu, format='wcs1d-fits')

# Read it in and check against the original
with fits.open(tmpfile) as hdulist:
assert hdulist[hdu].is_image
assert hdulist[hdu].header['NAXIS'] == 1
assert hdulist[hdu].header['NAXIS1'] == flux.shape[0]
assert u.Unit(hdulist[hdu].header['CUNIT1']) == u.Unit(hdr['CUNIT1'])
assert quantity_allclose(hdulist[hdu].data * flu, flux)

# Test again with automatic format selection by filename pattern
tmpfile = str(tmpdir.join('_wcs.fits'))
spectrum.write(tmpfile, hdu=hdu)
with fits.open(tmpfile) as hdulist:
assert hdulist[hdu].is_image
assert quantity_allclose(hdulist[hdu].data * flu, flux)


@pytest.mark.parametrize("spectral_axis",
['wavelength', 'frequency', 'energy', 'wavenumber'])
def test_wcs1d_fits_multid(tmpdir, spectral_axis):
Expand Down

0 comments on commit 560b120

Please sign in to comment.