Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enh. DataChunkIterator, support: 1) h5 dataset, 2) skipping of blocks, 3) arbit. iter dim #132

Merged
merged 17 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/hdmf/backends/hdf5/h5tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ def __chunked_iter_fill__(cls, parent, name, data, options=None):
:param data: The data to be written.
:type data: DataChunkIterator
:param options: Dict with options for creating a dataset. available options are 'dtype' and 'io_settings'
:type data: dict
:type options: dict

"""
io_settings = {}
Expand Down
157 changes: 118 additions & 39 deletions src/hdmf/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from operator import itemgetter

import numpy as np
from warnings import warn
from six import with_metaclass, text_type, binary_type

from .container import Data, DataRegion
Expand Down Expand Up @@ -110,41 +111,61 @@ class DataChunkIterator(AbstractDataChunkIterator):
Custom iterator class used to iterate over chunks of data.

This default implementation of AbstractDataChunkIterator accepts any iterable and assumes that we iterate over
the first dimension of the data array. DataChunkIterator supports buffered read,
a single dimension of the data array (default: the first dimension). DataChunkIterator supports buffered read,
i.e., multiple values from the input iterator can be combined to a single chunk. This is
useful for buffered I/O operations, e.g., to improve performance by accumulating data
in memory and writing larger blocks at once.
"""
@docval({'name': 'data', 'type': None, 'doc': 'The data object used for iteration', 'default': None},

__docval_init = (
{'name': 'data', 'type': None, 'doc': 'The data object used for iteration', 'default': None},
{'name': 'maxshape', 'type': tuple,
'doc': 'The maximum shape of the full data array. Use None to indicate unlimited dimensions',
'default': None},
{'name': 'dtype', 'type': np.dtype, 'doc': 'The Numpy data type for the array', 'default': None},
{'name': 'buffer_size', 'type': int, 'doc': 'Number of values to be buffered in a chunk', 'default': 1},
{'name': 'iter_axis', 'type': int, 'doc': 'The dimension to iterate over', 'default': 0}
)

@docval(*__docval_init)
def __init__(self, **kwargs):
"""Initialize the DataChunkIterator"""
# Get the user parameters
self.data, self.__maxshape, self.__dtype, self.buffer_size = getargs('data',
'maxshape',
'dtype',
'buffer_size',
kwargs)
self.data, self.__maxshape, self.__dtype, self.buffer_size, self.iter_axis = getargs('data',
'maxshape',
'dtype',
'buffer_size',
'iter_axis',
kwargs)
self.chunk_index = 0
# Create an iterator for the data if possible
self.__data_iter = iter(self.data) if isinstance(self.data, Iterable) else None
if isinstance(self.data, Iterable):
if self.iter_axis != 0 and isinstance(self.data, (list, tuple)):
warn('Iterating over an axis other than the first dimension of list or tuple data '
'involves converting the data object to a numpy ndarray, which may incur a computational '
'cost.')
self.data = np.asarray(self.data)
if isinstance(self.data, np.ndarray):
# iterate over the given axis by adding a new view on data (iter only works on the first dim)
self.__data_iter = iter(np.moveaxis(self.data, self.iter_axis, 0))
else:
self.__data_iter = iter(self.data)
else:
self.__data_iter = None
self.__next_chunk = DataChunk(None, None)
self.__next_chunk_start = 0
self.__first_chunk_shape = None
# Determine the shape of the data if possible
if self.__maxshape is None:
# If the self.data object identifies it shape then use it
# If the self.data object identifies its shape, then use it
if hasattr(self.data, "shape"):
self.__maxshape = self.data.shape
# Avoid the special case of scalar values by making them into a 1D numpy array
if len(self.__maxshape) == 0:
self.data = np.asarray([self.data, ])
self.__maxshape = self.data.shape
self.__data_iter = iter(self.data)
# Try to get an accurate idea of __maxshape for other Python datastructures if possible.
# Try to get an accurate idea of __maxshape for other Python data structures if possible.
# Don't just callget_shape for a generator as that would potentially trigger loading of all the data
elif isinstance(self.data, list) or isinstance(self.data, tuple):
self.__maxshape = get_data_shape(self.data, strict_no_data_load=True)
Expand All @@ -155,27 +176,18 @@ def __init__(self, **kwargs):

# If we still don't know the shape then try to determine the shape from the first chunk
if self.__maxshape is None and self.__next_chunk.data is not None:
data_shape = get_data_shape(self.__next_chunk.data)
self.__maxshape = list(data_shape)
try:
self.__maxshape[0] = len(self.data) # We use self.data here because self.__data_iter does not allow len
except TypeError:
self.__maxshape[0] = None
self.__maxshape = tuple(self.__maxshape)
self._set_maxshape_from_next_chunk()

# Determine the type of the data if possible
if self.__next_chunk.data is not None:
self.__dtype = self.__next_chunk.data.dtype
self.__first_chunk_shape = get_data_shape(self.__next_chunk.data)

if self.__dtype is None:
raise Exception('Data type could not be determined. Please specify dtype in DataChunkIterator init.')

@classmethod
@docval({'name': 'data', 'type': None, 'doc': 'The data object used for iteration', 'default': None},
{'name': 'maxshape', 'type': tuple,
'doc': 'The maximum shape of the full data array. Use None to indicate unlimited dimensions',
'default': None},
{'name': 'dtype', 'type': np.dtype, 'doc': 'The Numpy data type for the array', 'default': None},
{'name': 'buffer_size', 'type': int, 'doc': 'Number of values to be buffered in a chunk', 'default': 1},
)
@docval(*__docval_init)
def from_iterable(cls, **kwargs):
return cls(**kwargs)

Expand All @@ -189,33 +201,100 @@ def _read_next_chunk(self):

:returns: self.__next_chunk, i.e., the DataChunk object describing the next chunk
"""
if self.__data_iter is not None:
curr_next_chunk = []
for i in range(self.buffer_size):
from h5py import Dataset as H5Dataset
if isinstance(self.data, H5Dataset):
start_index = self.chunk_index * self.buffer_size
stop_index = start_index + self.buffer_size
iter_data_bounds = self.data.shape[self.iter_axis]
if start_index >= iter_data_bounds:
self.__next_chunk = DataChunk(None, None)
else:
if stop_index > iter_data_bounds:
stop_index = iter_data_bounds

selection = [slice(None)] * len(self.__maxshape)
selection[self.iter_axis] = slice(start_index, stop_index)
selection = tuple(selection)
self.__next_chunk.data = self.data[selection]
self.__next_chunk.selection = selection
elif self.__data_iter is not None:
# the pieces in the buffer - first dimension consists of individual calls to next
iter_pieces = []
# offset of where data begins - shift the selection of where to place this chunk by this much
curr_chunk_offset = 0
read_next_empty = False
while len(iter_pieces) < self.buffer_size:
try:
curr_next_chunk.append(next(self.__data_iter))
dat = next(self.__data_iter)
if dat is None and len(iter_pieces) == 0:
# Skip forward in our chunk until we find data
curr_chunk_offset += 1
elif dat is None and len(iter_pieces) > 0:
# Stop iteration if we hit empty data while constructing our block
# Buffer may not be full.
read_next_empty = True
break
else:
# Add pieces of data to our buffer
iter_pieces.append(np.asarray(dat))
except StopIteration:
pass
next_chunk_size = len(curr_next_chunk)
if next_chunk_size == 0:
self.__next_chunk = DataChunk(None, None)
break

if len(iter_pieces) == 0:
self.__next_chunk = DataChunk(None, None) # signal end of iteration
else:
self.__next_chunk.data = np.asarray(curr_next_chunk)
if self.__next_chunk.selection is None:
self.__next_chunk.selection = slice(0, next_chunk_size)
else:
self.__next_chunk.selection = slice(self.__next_chunk.selection.stop,
self.__next_chunk.selection.stop+next_chunk_size)
# concatenate all the pieces into the chunk along the iteration axis
piece_shape = list(get_data_shape(iter_pieces[0]))
piece_shape.insert(self.iter_axis, 1) # insert the missing axis
next_chunk_shape = piece_shape.copy()
next_chunk_shape[self.iter_axis] *= len(iter_pieces)
next_chunk_size = next_chunk_shape[self.iter_axis]

# use the piece dtype because the actual dtype may not have been determined yet
# NOTE: this could be problematic if a generator returns e.g. floats first and ints later
self.__next_chunk.data = np.empty(next_chunk_shape, dtype=iter_pieces[0].dtype)
self.__next_chunk.data = np.stack(iter_pieces, axis=self.iter_axis)

if self.__maxshape is None:
self._set_maxshape_from_next_chunk()

selection = [slice(None)] * len(self.__maxshape)
selection[self.iter_axis] = slice(self.__next_chunk_start + curr_chunk_offset,
self.__next_chunk_start + curr_chunk_offset + next_chunk_size)
self.__next_chunk.selection = tuple(selection)

# next chunk should start at self.__next_chunk.selection[self.iter_axis].stop
# but if this chunk stopped because of reading empty data, then this should be adjusted by 1
self.__next_chunk_start = self.__next_chunk.selection[self.iter_axis].stop
if read_next_empty:
self.__next_chunk_start += 1
else:
self.__next_chunk = DataChunk(None, None)

self.chunk_index += 1
return self.__next_chunk

def _set_maxshape_from_next_chunk(self):
data_shape = get_data_shape(self.__next_chunk.data)
self.__maxshape = list(data_shape)
try:
# Size of self.__next_chunk.data along self.iter_axis is not accurate for maxshape because it is just a
# chunk. So try to set maxshape along the dimension self.iter_axis based on the shape of self.data if
# possible. Otherwise, use None to represent an unlimited size
if hasattr(self.data, '__len__') and self.iter_axis == 0:
# special case of 1-D array
self.__maxshape[0] = len(self.data)
else:
self.__maxshape[self.iter_axis] = self.data.shape[self.iter_axis]
except AttributeError: # from self.data.shape
self.__maxshape[self.iter_axis] = None
self.__maxshape = tuple(self.__maxshape)

def __next__(self):
r"""Return the next data chunk or raise a StopIteration exception if all chunks have been retrieved.

HINT: numpy.s\_ provides a convenient way to generate index tuples using standard array slicing. This
is often useful to define the DataChunkk.selection of the current chunk
is often useful to define the DataChunk.selection of the current chunk

:returns: DataChunk object with the data and selection of the current chunk
:rtype: DataChunk
Expand Down
Loading