Skip to content

Commit

Permalink
Merge pull request #1430 from ioam/datashader_ndoverlay_opt
Browse files Browse the repository at this point in the history
Optimized datashader aggregation of NdOverlays
  • Loading branch information
jlstevens authored May 12, 2017
2 parents 2b46a15 + 71201cb commit a99833f
Showing 1 changed file with 79 additions and 1 deletion.
80 changes: 79 additions & 1 deletion holoviews/operation/datashader.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,87 @@ def get_agg_data(cls, obj, category=None):
return x, y, Dataset(df, kdims=kdims, vdims=vdims), glyph


def _aggregate_ndoverlay(self, element, agg_fn):
"""
Optimized aggregation for NdOverlay objects by aggregating each
Element in an NdOverlay individually avoiding having to concatenate
items in the NdOverlay. Works by summing sum and count aggregates and
applying appropriate masking for NaN values. Mean aggregation
is also supported by dividing sum and count aggregates. count_cat
aggregates are grouped by the categorical dimension and a separate
aggregate for each category is generated.
"""
# Compute overall bounds
x, y = element.last.dimensions()[0:2]
xstart, xend = self.p.x_range if self.p.x_range else element.range(x)
ystart, yend = self.p.y_range if self.p.y_range else element.range(y)
agg_params = dict({k: v for k, v in self.p.items() if k in aggregate.params()},
x_range=(xstart, xend), y_range=(ystart, yend))

# Optimize categorical counts by aggregating them individually
if isinstance(agg_fn, ds.count_cat):
agg_params.update(dict(dynamic=False, aggregator=ds.count()))
agg_fn1 = aggregate.instance(**agg_params)
if element.ndims == 1:
grouped = element
else:
grouped = element.groupby([agg_fn.column], container_type=NdOverlay,
group_type=NdOverlay)
return grouped.clone({k: agg_fn1(v) for k, v in grouped.items()})

# Create aggregate instance for sum, count operations, breaking mean
# into two aggregates
column = agg_fn.column or 'Count'
if isinstance(agg_fn, ds.mean):
agg_fn1 = aggregate.instance(**dict(agg_params, aggregator=ds.sum(column)))
agg_fn2 = aggregate.instance(**dict(agg_params, aggregator=ds.count()))
else:
agg_fn1 = aggregate.instance(**agg_params)
agg_fn2 = None
is_sum = isinstance(agg_fn1.aggregator, ds.sum)

# Accumulate into two aggregates and mask
agg, agg2, mask = None, None, None
mask = None
for v in element:
# Compute aggregates and mask
new_agg = agg_fn1.process_element(v, None)
if is_sum:
new_mask = np.isnan(new_agg.data[column].values)
new_agg.data = new_agg.data.fillna(0)
if agg_fn2:
new_agg2 = agg_fn2.process_element(v, None)

if agg is None:
agg = new_agg
if is_sum: mask = new_mask
if agg_fn2: agg2 = new_agg2
else:
agg.data += new_agg.data
if is_sum: mask &= new_mask
if agg_fn2: agg2.data += new_agg2.data

# Divide sum by count to compute mean
if agg2 is not None:
agg2.data.rename({'Count': agg_fn.column}, inplace=True)
with np.errstate(divide='ignore', invalid='ignore'):
agg.data /= agg2.data

# Fill masked with with NaNs
if is_sum:
agg.data[column].values[mask] = np.NaN
return agg


def _process(self, element, key=None):
agg_fn = self.p.aggregator
category = agg_fn.column if isinstance(agg_fn, ds.count_cat) else None

if (isinstance(element, NdOverlay) and
((isinstance(agg_fn, (ds.count, ds.sum, ds.mean)) and agg_fn.column not in element.kdims) or
(isinstance(agg_fn, ds.count_cat) and agg_fn.column in element.kdims))):
return self._aggregate_ndoverlay(element, agg_fn)

x, y, data, glyph = self.get_agg_data(element, category)

if x is None or y is None:
Expand Down Expand Up @@ -273,7 +351,7 @@ def concatenate(cls, overlay):
if not isinstance(overlay, NdOverlay):
raise ValueError('Only NdOverlays can be concatenated')
xarr = xr.concat([v.data.T for v in overlay.values()],
dim=overlay.kdims[0].name)
pd.Index(overlay.keys(), name=overlay.kdims[0].name))
params = dict(get_param_values(overlay.last),
vdims=overlay.last.vdims,
kdims=overlay.kdims+overlay.last.kdims)
Expand Down

0 comments on commit a99833f

Please sign in to comment.