Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement decaying D8 flow accumulation #389

Merged
2 changes: 2 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Release History
called with an invalid resampling algorithm. We now fall back to the
underlying GDAL functions' error messages.
https://github.com/natcap/pygeoprocessing/issues/387
* Implementing decaying flow accumulation for D8 routing.
https://github.com/natcap/pygeoprocessing/issues/386
* Updated to Cython 3.
* Dropped support for Python 3.7.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This file records the packages and requirements needed in order for
# pygeoprocessing to work as expected.
GDAL>=3.0.4
numpy>=1.10.1
numpy>=1.10.1,<2.0
Rtree>=0.8.3
scipy>=0.14.1,!=0.19.1
Shapely>=1.6.4
Expand Down
139 changes: 122 additions & 17 deletions src/pygeoprocessing/routing/routing.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ cdef struct FlowPixelType:
int last_flow_dir
double value

# This struct is used to track a value as it decays. The intent is to have the
# ``decayed_value`` updated in-place as we walk the flow graph until the value
# is less than the ``min_value``.
cdef struct DecayingValue:
double decayed_value # The value, which will be progressively updated as it decays
double min_value # The minimum value before the Decaying Value should be ignored.

# This struct is used to track an intermediate flow pixel's last calculated
# direction and flow accumulation value so far (just like with FlowPixelType).
# Additionally, we track all of the decaying values from upstream that
# influence the load on this pixel. Used during decaying flow accumulation.
cdef struct WeightedFlowPixelType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this replaces ALL instances of FlowPixelType. Should we remove that struct definition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, I'm still seeing FlowPixelType used in flow_accumulation_mfd, so I think it makes sense to leave it here for now.

unsigned int xi # The pixel's x coordinate in pixel space
unsigned int yi # The pixel's y coordinate in pixel space
int last_flow_dir # The last flow direction processed on this pixel
double value # The flow accumulation value at this pixel
queue[DecayingValue] decaying_values # A queue of upstream decaying values that affect the accumulation on this pixel.

# this struct is used in distance_to_channel_mfd to add up each pixel's
# weighted distances and flow weights
cdef struct MFDFlowPixelType:
Expand Down Expand Up @@ -1442,7 +1460,8 @@ def flow_dir_d8(

def flow_accumulation_d8(
flow_dir_raster_path_band, target_flow_accum_raster_path,
weight_raster_path_band=None,
weight_raster_path_band=None, custom_decay_factor=None,
double min_decay_proportion=0.001,
raster_driver_creation_tuple=DEFAULT_GTIFF_CREATION_TUPLE_OPTIONS):
"""D8 flow accumulation.

Expand All @@ -1468,6 +1487,18 @@ def flow_accumulation_d8(
weight. If ``None``, 1 is the default flow accumulation weight.
This raster must be the same dimensions as
``flow_dir_mfd_raster_path_band``.
custom_decay_factor=None (float or tuple): a custom decay factor, either
represented as a float (where the same decay factor is applied to
all valid pixels) or a path/band tuple for a raster where pixel
values represent spatially-explicit decay values. As the on-pixel
load passes through a pixel, the decay factor is applied, so you
could think of this layer as representing the proportion of a
pollutant that is absorbed by the landscape as it passes through
along the flowpath.
min_decay_proportion=0.001 (float): A value representing the minimum
decayed value that should continue to be tracked along the flow
path when using a custom decay factor. If the upstream decayed
contribution falls below this value, it is not included.
raster_driver_creation_tuple (tuple): a tuple containing a GDAL driver
name string as the first element and a GDAL creation options
tuple/list as the second. Defaults to a GTiff driver tuple
Expand Down Expand Up @@ -1505,8 +1536,8 @@ def flow_accumulation_d8(

# `search_stack` is used to walk upstream to calculate flow accumulation
# values
cdef stack[FlowPixelType] search_stack
cdef FlowPixelType flow_pixel
cdef stack[WeightedFlowPixelType] search_stack
cdef WeightedFlowPixelType flow_pixel

# properties of the parallel rasters
cdef unsigned int raster_x_size, raster_y_size
Expand Down Expand Up @@ -1552,6 +1583,29 @@ def flow_accumulation_d8(
if raw_weight_nodata is not None:
weight_nodata = raw_weight_nodata

cdef short do_decayed_accumulation = False
cdef short use_const_decay_factor = False
cdef float decay_factor = 1.0
cdef double decay_factor_nodata
if custom_decay_factor is not None:
do_decayed_accumulation = True
if isinstance(custom_decay_factor, (int, float)):
decay_factor = custom_decay_factor
use_const_decay_factor = True
else: # assume a path/band tuple
if not _is_raster_path_band_formatted(custom_decay_factor):
raise ValueError(
"%s is supposed to be a raster band tuple but it's not." % (
custom_decay_factor))
decay_factor_managed_raster = _ManagedRaster(
custom_decay_factor[0], custom_decay_factor[1], 0)
_tmp_decay_factor_nodata = pygeoprocessing.get_raster_info(
custom_decay_factor[0])['nodata'][0]
if _tmp_decay_factor_nodata is None:
decay_factor_nodata = float('nan')
else:
decay_factor_nodata = _tmp_decay_factor_nodata

flow_dir_raster_info = pygeoprocessing.get_raster_info(
flow_dir_raster_path_band[0])
raster_x_size, raster_y_size = flow_dir_raster_info['raster_size']
Expand All @@ -1564,6 +1618,8 @@ def flow_accumulation_d8(
flow_dir_nodata = tmp_flow_dir_nodata

# this outer loop searches for a pixel that is locally undrained
cdef queue[DecayingValue] decay_from_upstream
cdef double upstream_transport_factor = 1.0 # proportion of load transported to downstream pixel
for offset_dict in pygeoprocessing.iterblocks(
flow_dir_raster_path_band, offset_only=True, largest_block=0):
win_xsize = offset_dict['win_xsize']
Expand Down Expand Up @@ -1593,7 +1649,9 @@ def flow_accumulation_d8(
xi_n = -1
yi_n = -1

# search block for to set flow direction
# Search the block for root pixels.
# A root pixel is a flow direction pixel that is nodata, which means it
# may be a drain just off the edge.
Comment on lines +1652 to +1654
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

for yi in range(1, win_ysize+1):
for xi in range(1, win_xsize+1):
flow_dir = flow_dir_buffer_array[yi, xi]
Expand All @@ -1615,51 +1673,98 @@ def flow_accumulation_d8(
else:
weight_val = 1.0
search_stack.push(
FlowPixelType(xi_root, yi_root, 0, weight_val))
WeightedFlowPixelType(xi_root, yi_root, 0, weight_val,
queue[DecayingValue]()))

# Drain the queue of upstream neighbors since we're starting
# from a new root pixel.
while not decay_from_upstream.empty():
decay_from_upstream.pop()

while not search_stack.empty():
flow_pixel = search_stack.top()
search_stack.pop()

preempted = 0
# Load any decaying values from upstream into the current flow pixel.
while not decay_from_upstream.empty():
upstream_decaying_value = decay_from_upstream.front()
decay_from_upstream.pop()
if upstream_decaying_value.decayed_value > upstream_decaying_value.min_value:
flow_pixel.decaying_values.push(upstream_decaying_value)

upstream_pixels_remain = 0
for i_n in range(flow_pixel.last_flow_dir, 8):
xi_n = flow_pixel.xi+D8_XOFFSET[i_n]
yi_n = flow_pixel.yi+D8_YOFFSET[i_n]
if (xi_n < 0 or xi_n >= raster_x_size or
yi_n < 0 or yi_n >= raster_y_size):
# no upstream here
# neighbor not upstream: off edges of the raster
continue
upstream_flow_dir = <int>flow_dir_managed_raster.get(
xi_n, yi_n)
if upstream_flow_dir == flow_dir_nodata or (
upstream_flow_dir !=
D8_REVERSE_DIRECTION[i_n]):
# no upstream here
upstream_flow_dir != D8_REVERSE_DIRECTION[i_n]):
# neighbor not upstream: is nodata or doesn't flow in
continue

upstream_flow_accum = <double>(
flow_accum_managed_raster.get(xi_n, yi_n))
if _is_close(upstream_flow_accum, flow_accum_nodata, 1e-8, 1e-5):
# process upstream before this one
flow_pixel.last_flow_dir = i_n
search_stack.push(flow_pixel)
# Flow accumulation pixel is nodata until it and everything
# upstream of it has been computed.
if weight_raster is not None:
weight_val = <double>weight_raster.get(
xi_n, yi_n)
if _is_close(weight_val, weight_nodata, 1e-8, 1e-5):
weight_val = 0.0
else:
weight_val = 1.0
if do_decayed_accumulation:
flow_pixel.decaying_values.push(
DecayingValue(weight_val,
weight_val * min_decay_proportion))

# process upstream pixel before this neighbor
flow_pixel.last_flow_dir = i_n
search_stack.push(flow_pixel)
search_stack.push(
FlowPixelType(xi_n, yi_n, 0, weight_val))
preempted = 1
WeightedFlowPixelType(xi_n, yi_n, 0, weight_val,
queue[DecayingValue]()))
upstream_pixels_remain = 1
break
flow_pixel.value += upstream_flow_accum
if not preempted:

if not do_decayed_accumulation:
flow_pixel.value += upstream_flow_accum

if not upstream_pixels_remain:
# flow_pixel.value already has the on-pixel load
# from upstream, so we just need to add it from the
# decaying values queue
if do_decayed_accumulation:
if use_const_decay_factor:
upstream_transport_factor = decay_factor
else:
upstream_transport_factor = (
decay_factor_managed_raster.get(flow_pixel.xi, flow_pixel.yi))
# If nodata, assume nothing will be transported from this pixel.
if _is_close(upstream_transport_factor, decay_factor_nodata, 1e-8, 1e-5):
upstream_transport_factor = 0.0

while not flow_pixel.decaying_values.empty():
decayed_value = flow_pixel.decaying_values.front()
flow_pixel.decaying_values.pop()
decayed_value.decayed_value *= upstream_transport_factor
flow_pixel.value += decayed_value.decayed_value
decay_from_upstream.push(decayed_value)

flow_accum_managed_raster.set(
flow_pixel.xi, flow_pixel.yi,
flow_pixel.value)
flow_accum_managed_raster.close()
flow_dir_managed_raster.close()
if do_decayed_accumulation:
if not use_const_decay_factor:
decay_factor_managed_raster.close()
if weight_raster is not None:
weight_raster.close()
LOGGER.info('Flow accumulation D8 %.1f%% complete', 100.0)
Expand Down
80 changes: 80 additions & 0 deletions tests/test_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,3 +1378,83 @@ def test_extract_streams_d8(self):
numpy.testing.assert_array_equal(
pygeoprocessing.raster_to_numpy_array(target_streams_path),
expected_array)

def test_flow_accum_d8_with_decay(self):
"""PGP.routing: test d8 flow accumulation with decay."""
# this was generated from a pre-calculated plateau drain dem
flow_dir_array = numpy.array([
[2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0],
[4, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0],
[4, 4, 2, 2, 2, 2, 2, 2, 2, 0, 0],
[4, 4, 4, 2, 2, 2, 2, 2, 0, 0, 0],
[4, 4, 4, 4, 2, 2, 2, 0, 0, 0, 0],
[4, 4, 4, 4, 4, 2, 0, 0, 0, 0, 0],
[4, 4, 4, 4, 4, 6, 0, 0, 0, 0, 0],
[4, 4, 4, 4, 6, 6, 6, 0, 0, 0, 0],
[4, 4, 4, 6, 6, 6, 6, 6, 0, 0, 0],
[4, 4, 6, 6, 6, 6, 6, 6, 6, 0, 0],
[4, 6, 6, 6, 6, 6, 6, 6, 6, 6, 0]], dtype=numpy.uint8)

flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif')
_array_to_raster(flow_dir_array, None, flow_dir_path)

target_flow_accum_path = os.path.join(
self.workspace_dir, 'flow_accum.tif')

# Test with scalar decay factor and also with a raster of scalar values
const_decay_factor = 0.5
decay_factor_path = os.path.join(
self.workspace_dir, 'decay_factor.tif')
decay_array = numpy.full(flow_dir_array.shape, const_decay_factor,
dtype=numpy.float32)
_array_to_raster(decay_array, None, decay_factor_path)

for decay_factor in (const_decay_factor, (decay_factor_path, 1)):
pygeoprocessing.routing.flow_accumulation_d8(
(flow_dir_path, 1), target_flow_accum_path,
custom_decay_factor=decay_factor)

flow_accum_array = pygeoprocessing.raster_to_numpy_array(
target_flow_accum_path)
self.assertEqual(flow_accum_array.dtype, numpy.float64)

# This array is a regression result saved by hand, but
# because this flow accumulation doesn't have any joining flow
# paths we can calculate weighted flow accumulation with the
# closed form of the summation:
# decayed_accum = 2 - decay_factor ** (flow_accum - 1)
expected_result = 2 - const_decay_factor ** (numpy.array(
[[1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1],
[1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1],
[2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2],
[3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3],
[4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4],
[5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5],
[5, 4, 3, 2, 1, 1, 1, 2, 3, 4, 5],
[4, 3, 2, 1, 1, 2, 1, 1, 2, 3, 4],
[3, 2, 1, 1, 2, 3, 2, 1, 1, 2, 3],
[2, 1, 1, 2, 3, 4, 3, 2, 1, 1, 2],
[1, 1, 2, 3, 4, 5, 4, 3, 2, 1, 1]]) - 1)

numpy.testing.assert_almost_equal(
flow_accum_array, expected_result)

def test_flow_accum_with_decay_merging_flow(self):
"""PGP.routing: test d8 flow accum with decay and merged flowpath."""
flow_dir_path = os.path.join(self.workspace_dir, 'flow_dir.tif')
_array_to_raster(
numpy.array([
[255, 0, 0, 0, 0],
[255, 0, 0, 0, 2]], dtype=numpy.uint8), 255, flow_dir_path)

flow_accum_path = os.path.join(self.workspace_dir, 'flow_accum.tif')
pygeoprocessing.routing.flow_accumulation_d8(
(flow_dir_path, 1), flow_accum_path, custom_decay_factor=0.5)

fnodata = -1.23789789e29 # copied from routing.pyx
expected_array = numpy.array([
[fnodata, 1, 1.5, 1.75, 2.8125],
[fnodata, 1, 1.5, 1.75, 1.875]], dtype=numpy.float64)
numpy.testing.assert_allclose(
pygeoprocessing.raster_to_numpy_array(flow_accum_path),
expected_array)
Loading