diff --git a/HISTORY.rst b/HISTORY.rst index 8c3f4afe..f84f4dea 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -10,6 +10,8 @@ Release History called with an invalid resampling algorithm. We now fall back to the underlying GDAL functions' error messages. https://github.com/natcap/pygeoprocessing/issues/387 +* Implementing decaying flow accumulation for D8 routing. + https://github.com/natcap/pygeoprocessing/issues/386 * Updated to Cython 3. * Dropped support for Python 3.7. diff --git a/requirements.txt b/requirements.txt index 3084426e..b1e346e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ # This file records the packages and requirements needed in order for # pygeoprocessing to work as expected. GDAL>=3.0.4 -numpy>=1.10.1 +numpy>=1.10.1,<2.0 Rtree>=0.8.3 scipy>=0.14.1,!=0.19.1 Shapely>=1.6.4 diff --git a/src/pygeoprocessing/routing/routing.pyx b/src/pygeoprocessing/routing/routing.pyx index f38970a2..c18e3437 100644 --- a/src/pygeoprocessing/routing/routing.pyx +++ b/src/pygeoprocessing/routing/routing.pyx @@ -138,6 +138,24 @@ cdef struct FlowPixelType: int last_flow_dir double value +# This struct is used to track a value as it decays. The intent is to have the +# ``decayed_value`` updated in-place as we walk the flow graph until the value +# is less than the ``min_value``. +cdef struct DecayingValue: + double decayed_value # The value, which will be progressively updated as it decays + double min_value # The minimum value before the Decaying Value should be ignored. + +# This struct is used to track an intermediate flow pixel's last calculated +# direction and flow accumulation value so far (just like with FlowPixelType). +# Additionally, we track all of the decaying values from upstream that +# influence the load on this pixel. Used during decaying flow accumulation. +cdef struct WeightedFlowPixelType: + unsigned int xi # The pixel's x coordinate in pixel space + unsigned int yi # The pixel's y coordinate in pixel space + int last_flow_dir # The last flow direction processed on this pixel + double value # The flow accumulation value at this pixel + queue[DecayingValue] decaying_values # A queue of upstream decaying values that affect the accumulation on this pixel. + # this struct is used in distance_to_channel_mfd to add up each pixel's # weighted distances and flow weights cdef struct MFDFlowPixelType: @@ -1442,7 +1460,8 @@ def flow_dir_d8( def flow_accumulation_d8( flow_dir_raster_path_band, target_flow_accum_raster_path, - weight_raster_path_band=None, + weight_raster_path_band=None, custom_decay_factor=None, + double min_decay_proportion=0.001, raster_driver_creation_tuple=DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS): """D8 flow accumulation. @@ -1468,6 +1487,18 @@ def flow_accumulation_d8( weight. If ``None``, 1 is the default flow accumulation weight. This raster must be the same dimensions as ``flow_dir_mfd_raster_path_band``. + custom_decay_factor=None (float or tuple): a custom decay factor, either + represented as a float (where the same decay factor is applied to + all valid pixels) or a path/band tuple for a raster where pixel + values represent spatially-explicit decay values. As the on-pixel + load passes through a pixel, the decay factor is applied, so you + could think of this layer as representing the proportion of a + pollutant that is absorbed by the landscape as it passes through + along the flowpath. + min_decay_proportion=0.001 (float): A value representing the minimum + decayed value that should continue to be tracked along the flow + path when using a custom decay factor. If the upstream decayed + contribution falls below this value, it is not included. raster_driver_creation_tuple (tuple): a tuple containing a GDAL driver name string as the first element and a GDAL creation options tuple/list as the second. Defaults to a GTiff driver tuple @@ -1505,8 +1536,8 @@ def flow_accumulation_d8( # `search_stack` is used to walk upstream to calculate flow accumulation # values - cdef stack[FlowPixelType] search_stack - cdef FlowPixelType flow_pixel + cdef stack[WeightedFlowPixelType] search_stack + cdef WeightedFlowPixelType flow_pixel # properties of the parallel rasters cdef unsigned int raster_x_size, raster_y_size @@ -1552,6 +1583,29 @@ def flow_accumulation_d8( if raw_weight_nodata is not None: weight_nodata = raw_weight_nodata + cdef short do_decayed_accumulation = False + cdef short use_const_decay_factor = False + cdef float decay_factor = 1.0 + cdef double decay_factor_nodata + if custom_decay_factor is not None: + do_decayed_accumulation = True + if isinstance(custom_decay_factor, (int, float)): + decay_factor = custom_decay_factor + use_const_decay_factor = True + else: # assume a path/band tuple + if not _is_raster_path_band_formatted(custom_decay_factor): + raise ValueError( + "%s is supposed to be a raster band tuple but it's not." % ( + custom_decay_factor)) + decay_factor_managed_raster = _ManagedRaster( + custom_decay_factor[0], custom_decay_factor[1], 0) + _tmp_decay_factor_nodata = pygeoprocessing.get_raster_info( + custom_decay_factor[0])['nodata'][0] + if _tmp_decay_factor_nodata is None: + decay_factor_nodata = float('nan') + else: + decay_factor_nodata = _tmp_decay_factor_nodata + flow_dir_raster_info = pygeoprocessing.get_raster_info( flow_dir_raster_path_band[0]) raster_x_size, raster_y_size = flow_dir_raster_info['raster_size'] @@ -1564,6 +1618,8 @@ def flow_accumulation_d8( flow_dir_nodata = tmp_flow_dir_nodata # this outer loop searches for a pixel that is locally undrained + cdef queue[DecayingValue] decay_from_upstream + cdef double upstream_transport_factor = 1.0 # proportion of load transported to downstream pixel for offset_dict in pygeoprocessing.iterblocks( flow_dir_raster_path_band, offset_only=True, largest_block=0): win_xsize = offset_dict['win_xsize'] @@ -1593,7 +1649,9 @@ def flow_accumulation_d8( xi_n = -1 yi_n = -1 - # search block for to set flow direction + # Search the block for root pixels. + # A root pixel is a flow direction pixel that is nodata, which means it + # may be a drain just off the edge. for yi in range(1, win_ysize+1): for xi in range(1, win_xsize+1): flow_dir = flow_dir_buffer_array[yi, xi] @@ -1615,33 +1673,45 @@ def flow_accumulation_d8( else: weight_val = 1.0 search_stack.push( - FlowPixelType(xi_root, yi_root, 0, weight_val)) + WeightedFlowPixelType(xi_root, yi_root, 0, weight_val, + queue[DecayingValue]())) + + # Drain the queue of upstream neighbors since we're starting + # from a new root pixel. + while not decay_from_upstream.empty(): + decay_from_upstream.pop() while not search_stack.empty(): flow_pixel = search_stack.top() search_stack.pop() - preempted = 0 + # Load any decaying values from upstream into the current flow pixel. + while not decay_from_upstream.empty(): + upstream_decaying_value = decay_from_upstream.front() + decay_from_upstream.pop() + if upstream_decaying_value.decayed_value > upstream_decaying_value.min_value: + flow_pixel.decaying_values.push(upstream_decaying_value) + + upstream_pixels_remain = 0 for i_n in range(flow_pixel.last_flow_dir, 8): xi_n = flow_pixel.xi+D8_XOFFSET[i_n] yi_n = flow_pixel.yi+D8_YOFFSET[i_n] if (xi_n < 0 or xi_n >= raster_x_size or yi_n < 0 or yi_n >= raster_y_size): - # no upstream here + # neighbor not upstream: off edges of the raster continue upstream_flow_dir = flow_dir_managed_raster.get( xi_n, yi_n) if upstream_flow_dir == flow_dir_nodata or ( - upstream_flow_dir != - D8_REVERSE_DIRECTION[i_n]): - # no upstream here + upstream_flow_dir != D8_REVERSE_DIRECTION[i_n]): + # neighbor not upstream: is nodata or doesn't flow in continue + upstream_flow_accum = ( flow_accum_managed_raster.get(xi_n, yi_n)) if _is_close(upstream_flow_accum, flow_accum_nodata, 1e-8, 1e-5): - # process upstream before this one - flow_pixel.last_flow_dir = i_n - search_stack.push(flow_pixel) + # Flow accumulation pixel is nodata until it and everything + # upstream of it has been computed. if weight_raster is not None: weight_val = weight_raster.get( xi_n, yi_n) @@ -1649,17 +1719,52 @@ def flow_accumulation_d8( weight_val = 0.0 else: weight_val = 1.0 + if do_decayed_accumulation: + flow_pixel.decaying_values.push( + DecayingValue(weight_val, + weight_val * min_decay_proportion)) + + # process upstream pixel before this neighbor + flow_pixel.last_flow_dir = i_n + search_stack.push(flow_pixel) search_stack.push( - FlowPixelType(xi_n, yi_n, 0, weight_val)) - preempted = 1 + WeightedFlowPixelType(xi_n, yi_n, 0, weight_val, + queue[DecayingValue]())) + upstream_pixels_remain = 1 break - flow_pixel.value += upstream_flow_accum - if not preempted: + + if not do_decayed_accumulation: + flow_pixel.value += upstream_flow_accum + + if not upstream_pixels_remain: + # flow_pixel.value already has the on-pixel load + # from upstream, so we just need to add it from the + # decaying values queue + if do_decayed_accumulation: + if use_const_decay_factor: + upstream_transport_factor = decay_factor + else: + upstream_transport_factor = ( + decay_factor_managed_raster.get(flow_pixel.xi, flow_pixel.yi)) + # If nodata, assume nothing will be transported from this pixel. + if _is_close(upstream_transport_factor, decay_factor_nodata, 1e-8, 1e-5): + upstream_transport_factor = 0.0 + + while not flow_pixel.decaying_values.empty(): + decayed_value = flow_pixel.decaying_values.front() + flow_pixel.decaying_values.pop() + decayed_value.decayed_value *= upstream_transport_factor + flow_pixel.value += decayed_value.decayed_value + decay_from_upstream.push(decayed_value) + flow_accum_managed_raster.set( flow_pixel.xi, flow_pixel.yi, flow_pixel.value) flow_accum_managed_raster.close() flow_dir_managed_raster.close() + if do_decayed_accumulation: + if not use_const_decay_factor: + decay_factor_managed_raster.close() if weight_raster is not None: weight_raster.close() LOGGER.info('Flow accumulation D8 %.1f%% complete', 100.0) diff --git a/tests/test_routing.py b/tests/test_routing.py index 9bf4e849..49ca3897 100644 --- a/tests/test_routing.py +++ b/tests/test_routing.py @@ -1378,3 +1378,83 @@ def test_extract_streams_d8(self): numpy.testing.assert_array_equal( pygeoprocessing.raster_to_numpy_array(target_streams_path), expected_array) + + def test_flow_accum_d8_with_decay(self): + """PGP.routing: test d8 flow accumulation with decay.""" + # this was generated from a pre-calculated plateau drain dem + flow_dir_array = numpy.array([ + [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0], + [4, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0], + [4, 4, 2, 2, 2, 2, 2, 2, 2, 0, 0], + [4, 4, 4, 2, 2, 2, 2, 2, 0, 0, 0], + [4, 4, 4, 4, 2, 2, 2, 0, 0, 0, 0], + [4, 4, 4, 4, 4, 2, 0, 0, 0, 0, 0], + [4, 4, 4, 4, 4, 6, 0, 0, 0, 0, 0], + [4, 4, 4, 4, 6, 6, 6, 0, 0, 0, 0], + [4, 4, 4, 6, 6, 6, 6, 6, 0, 0, 0], + [4, 4, 6, 6, 6, 6, 6, 6, 6, 0, 0], + [4, 6, 6, 6, 6, 6, 6, 6, 6, 6, 0]], dtype=numpy.uint8) + + flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif') + _array_to_raster(flow_dir_array, None, flow_dir_path) + + target_flow_accum_path = os.path.join( + self.workspace_dir, 'flow_accum.tif') + + # Test with scalar decay factor and also with a raster of scalar values + const_decay_factor = 0.5 + decay_factor_path = os.path.join( + self.workspace_dir, 'decay_factor.tif') + decay_array = numpy.full(flow_dir_array.shape, const_decay_factor, + dtype=numpy.float32) + _array_to_raster(decay_array, None, decay_factor_path) + + for decay_factor in (const_decay_factor, (decay_factor_path, 1)): + pygeoprocessing.routing.flow_accumulation_d8( + (flow_dir_path, 1), target_flow_accum_path, + custom_decay_factor=decay_factor) + + flow_accum_array = pygeoprocessing.raster_to_numpy_array( + target_flow_accum_path) + self.assertEqual(flow_accum_array.dtype, numpy.float64) + + # This array is a regression result saved by hand, but + # because this flow accumulation doesn't have any joining flow + # paths we can calculate weighted flow accumulation with the + # closed form of the summation: + # decayed_accum = 2 - decay_factor ** (flow_accum - 1) + expected_result = 2 - const_decay_factor ** (numpy.array( + [[1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1], + [1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1], + [2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2], + [3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3], + [4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4], + [5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5], + [5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5], + [4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4], + [3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3], + [2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2], + [1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1]]) - 1) + + numpy.testing.assert_almost_equal( + flow_accum_array, expected_result) + + def test_flow_accum_with_decay_merging_flow(self): + """PGP.routing: test d8 flow accum with decay and merged flowpath.""" + flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif') + _array_to_raster( + numpy.array([ + [255, 0, 0, 0, 0], + [255, 0, 0, 0, 2]], dtype=numpy.uint8), 255, flow_dir_path) + + flow_accum_path = os.path.join(self.workspace_dir, 'flow_accum.tif') + pygeoprocessing.routing.flow_accumulation_d8( + (flow_dir_path, 1), flow_accum_path, custom_decay_factor=0.5) + + fnodata = -1.23789789e29 # copied from routing.pyx + expected_array = numpy.array([ + [fnodata, 1, 1.5, 1.75, 2.8125], + [fnodata, 1, 1.5, 1.75, 1.875]], dtype=numpy.float64) + numpy.testing.assert_allclose( + pygeoprocessing.raster_to_numpy_array(flow_accum_path), + expected_array)