Skip to content

Commit

Permalink
Merge pull request #1256 from ioam/stream_memoize
Browse files Browse the repository at this point in the history
Allow disabling stream memoization
  • Loading branch information
jlstevens authored Apr 9, 2017
2 parents d408876 + 3723aeb commit 8396441
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 62 deletions.
62 changes: 49 additions & 13 deletions holoviews/core/spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from numbers import Number
from itertools import groupby
from functools import partial
from contextlib import contextmanager

import numpy as np
import param
Expand Down Expand Up @@ -398,10 +399,14 @@ class Callable(param.Parameterized):
allowing their inputs (and in future outputs) to be defined.
This makes it possible to wrap DynamicMaps with streams and
makes it possible to traverse the graph of operations applied
to a DynamicMap. Additionally a Callable will memoize the last
returned value based on the arguments to the function and the
state of all streams on its inputs, to avoid calling the function
unnecessarily.
to a DynamicMap.
Additionally, if the memoize attribute is True, a Callable will
memoize the last returned value based on the arguments to the
function and the state of all streams on its inputs, to avoid
calling the function unnecessarily. Note that because memoization
includes the streams found on the inputs it may be disabled if the
stream requires it and is triggering.
A Callable may also specify a stream_mapping which specifies the
objects that are associated with interactive (i.e linked) streams
Expand All @@ -417,13 +422,19 @@ class Callable(param.Parameterized):
information see the DynamicMap tutorial at holoviews.org.
"""

callable = param.Callable(default=None, doc="""
callable = param.Callable(default=None, constant=True, doc="""
The callable function being wrapped.""")

inputs = param.List(default=[], doc="""
The list of inputs the callable function is wrapping.""")
inputs = param.List(default=[], constant=True, doc="""
The list of inputs the callable function is wrapping. Used
to allow deep access to streams in chained Callables.""")

memoize = param.Boolean(default=True, doc="""
Whether the return value of the callable should be memoized
based on the call arguments and any streams attached to the
inputs.""")

stream_mapping = param.Dict(default={}, doc="""
stream_mapping = param.Dict(default={}, constant=True, doc="""
Defines how streams should be mapped to objects returned by
the Callable, e.g. when it returns a Layout.""")

Expand All @@ -433,14 +444,20 @@ def __init__(self, callable, **params):

def __call__(self, *args, **kwargs):
inputs = [i for i in self.inputs if isinstance(i, DynamicMap)]
streams = [s for i in inputs for s in get_nested_streams(i)]
streams = []
for stream in [s for i in inputs for s in get_nested_streams(i)]:
if stream not in streams: streams.append(stream)

memoize = self.memoize and not any(s.transient and s._triggering for s in streams)
values = tuple(tuple(sorted(s.contents.items())) for s in streams)
key = args + tuple(sorted(kwargs.items())) + values

hashed_key = util.deephash(key)
ret = self._memoized.get(hashed_key, None)
if hashed_key and ret is None:
ret = self.callable(*args, **kwargs)
if memoize and hashed_key in self._memoized:
return self._memoized[hashed_key]

ret = self.callable(*args, **kwargs)
if hashed_key is not None:
self._memoized = {hashed_key : ret}
return ret

Expand All @@ -459,6 +476,24 @@ def get_nested_streams(dmap):
return list(set(layer_streams))


@contextmanager
def dynamicmap_memoization(callable_obj, streams):
"""
Determine whether the Callable should have memoization enabled
based on the supplied streams (typically by a
DynamicMap). Memoization is disabled if any of the streams require
it it and are currently in a triggered state.
"""
memoization_state = bool(callable_obj.memoize)
callable_obj.memoize &= not any(s.transient and s._triggering for s in streams)
try:
yield
except:
raise
finally:
callable_obj.memoize = memoization_state


class DynamicMap(HoloMap):
"""
A DynamicMap is a type of HoloMap where the elements are dynamically
Expand Down Expand Up @@ -589,7 +624,8 @@ def _execute_callback(self, *args):
kwarg_items = [s.contents.items() for s in self.streams]
flattened = [(k,v) for kws in kwarg_items for (k,v) in kws
if k not in kdims]
retval = self.callback(*args, **dict(flattened))
with dynamicmap_memoization(self.callback, self.streams):
retval = self.callback(*args, **dict(flattened))
return self._style(retval)


Expand Down
155 changes: 111 additions & 44 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,78 @@
from collections import defaultdict
from .core import util

from contextlib import contextmanager

class Stream(param.Parameterized):

@contextmanager
def triggering_streams(streams):
"""
A Stream is simply a parameterized object with parameters that
change over time in response to update events. Parameters are
updated via the update method.
Temporarily declares the streams as being in a triggered state.
Needed by DynamicMap to determine whether to memoize on a Callable,
i.e. if a stream has memoization disabled and is in triggered state
Callable should disable lookup in the memoization cache. This is
done by the dynamicmap_memoization context manager.
"""
for stream in streams:
stream._triggering = True
try:
yield
except:
raise
finally:
for stream in streams:
stream._triggering = False

Streams may have one or more subscribers which are callables passed
the parameter dictionary when the trigger classmethod is called.

Depending on the plotting backend certain streams may interactively
subscribe to events and changes by the plotting backend. For this
purpose use the LinkedStream baseclass, which enables the linked
option by default.
@contextmanager
def disable_constant(parameterized):
"""
Temporarily set parameters on Parameterized object to
constant=False.
"""
params = parameterized.params().values()
constants = [p.constant for p in params]
for param in params:
param.constant = False
try:
yield
except:
raise
finally:
for (param, const) in zip(params, constants):
param.constant = const


class Stream(param.Parameterized):
"""
A Stream is simply a parameterized object with parameters that
change over time in response to update events and may trigger
downstream events on its subscribers. The Stream parameters can be
updated using the update method, which will optionally trigger the
stream. This will notify the subscribers which may be supplied as
a list of callables or added later using the add_subscriber
method. The subscribers will be passed a dictionary mapping of the
parameters of the stream, which are available on the instance as
the ``contents``.
Depending on the plotting backend certain streams may
interactively subscribe to events and changes by the plotting
backend. For this purpose use the LinkedStream baseclass, which
enables the linked option by default. A source for the linking may
be supplied to the constructor in the form of another viewable
object specifying which part of a plot the data should come from.
The transient option allows treating stream events as discrete
updates, resetting the parameters to their default after the
stream has been triggered. A downstream callback can therefore
determine whether a stream is active by checking whether the
stream values match the default (usually None).
The Stream class is meant for subclassing and subclasses should
generally add one or more parameters but may also override the
transform and reset method to preprocess parameters before they
are passed to subscribers and reset them using custom logic
respectively.
"""

# Mapping from a source id to a list of streams
Expand Down Expand Up @@ -54,14 +112,19 @@ def trigger(cls, streams):
groups = [stream.subscribers for stream in streams]
subscribers = util.unique_iterator([s for subscribers in groups
for s in subscribers])
for subscriber in subscribers:
subscriber(**dict(union))

with triggering_streams(streams):
for subscriber in subscribers:
subscriber(**dict(union))

for stream in streams:
stream.deactivate()
with disable_constant(stream):
if stream.transient:
stream.reset()


def __init__(self, rename={}, source=None, subscribers=[], linked=False, **params):
def __init__(self, rename={}, source=None, subscribers=[], linked=False,
transient=False, **params):
"""
The rename argument allows multiple streams with similar event
state to be used by remapping parameter names.
Expand All @@ -80,6 +143,10 @@ def __init__(self, rename={}, source=None, subscribers=[], linked=False, **param

self.linked = linked
self._rename = self._validate_rename(rename)
self.transient = transient

# Whether this stream is currently triggering its subscribers
self._triggering = False

# The metadata may provide information about the currently
# active event, i.e. the source of the stream values may
Expand All @@ -96,12 +163,24 @@ def subscribers(self):
" Property returning the subscriber list"
return self._subscribers


def clear(self):
"""
Clear all subscribers registered to this stream.
"""
self._subscribers = []


def reset(self):
"""
Resets stream parameters to their defaults.
"""
with disable_constant(self):
for k, p in self.params().items():
if k != 'name':
setattr(self, k, p.default)


def add_subscriber(self, subscriber):
"""
Register a callable subscriber to this stream which will be
Expand All @@ -112,6 +191,7 @@ def add_subscriber(self, subscriber):
raise TypeError('Subscriber must be a callable.')
self._subscribers.append(subscriber)


def _validate_rename(self, mapping):
param_names = [k for k in self.params().keys() if k != 'name']
for k,v in mapping.items():
Expand All @@ -122,6 +202,7 @@ def _validate_rename(self, mapping):
'stream parameter of the same name' % v)
return mapping


def rename(self, **mapping):
"""
The rename method allows stream parameters to be allocated to
Expand All @@ -134,19 +215,11 @@ def rename(self, **mapping):
source=self._source,
linked=self.linked, **params)


def deactivate(self):
"""
Allows defining an action after the stream has been triggered,
e.g. resetting parameters on streams with transient events.
"""
pass


@property
def source(self):
return self._source


@source.setter
def source(self, source):
if self._source:
Expand All @@ -165,6 +238,7 @@ def transform(self):
"""
return {}


@property
def contents(self):
filtered = {k:v for k,v in self.get_param_values() if k!= 'name' }
Expand All @@ -176,19 +250,8 @@ def _set_stream_parameters(self, **kwargs):
Sets the stream parameters which are expected to be declared
constant.
"""
params = self.params().values()
constants = [p.constant for p in params]
for param in params:
param.constant = False
try:
with disable_constant(self) as constant:
self.set_param(**kwargs)
except Exception as e:
for (param, const) in zip(params, constants):
param.constant = const
raise

for (param, const) in zip(params, constants):
param.constant = const


def update(self, trigger=True, **kwargs):
Expand All @@ -208,6 +271,7 @@ def update(self, trigger=True, **kwargs):
if trigger:
self.trigger([self])


def __repr__(self):
cls_name = self.__class__.__name__
kwargs = ','.join('%s=%r' % (k,v)
Expand All @@ -218,7 +282,6 @@ def __repr__(self):
return '%s(%r, %s)' % (cls_name, self._rename, kwargs)



def __str__(self):
return repr(self)

Expand All @@ -242,8 +305,9 @@ class PositionX(LinkedStream):
position of the mouse/trackpad cursor.
"""

x = param.ClassSelector(class_=(Number, util.basestring), default=0, doc="""
Position along the x-axis in data coordinates""", constant=True)
x = param.ClassSelector(class_=(Number, util.basestring), default=None,
constant=True, doc="""
Position along the x-axis in data coordinates""")


class PositionY(LinkedStream):
Expand All @@ -254,8 +318,9 @@ class PositionY(LinkedStream):
position of the mouse/trackpad cursor.
"""

y = param.ClassSelector(class_=(Number, util.basestring), default=0, doc="""
Position along the y-axis in data coordinates""", constant=True)
y = param.ClassSelector(class_=(Number, util.basestring), default=None,
constant=True, doc="""
Position along the y-axis in data coordinates""")


class PositionXY(LinkedStream):
Expand All @@ -266,11 +331,13 @@ class PositionXY(LinkedStream):
position of the mouse/trackpad cursor.
"""

x = param.ClassSelector(class_=(Number, util.basestring), default=0, doc="""
Position along the x-axis in data coordinates""", constant=True)
x = param.ClassSelector(class_=(Number, util.basestring), default=None,
constant=True, doc="""
Position along the x-axis in data coordinates""")

y = param.ClassSelector(class_=(Number, util.basestring), default=0, doc="""
Position along the y-axis in data coordinates""", constant=True)
y = param.ClassSelector(class_=(Number, util.basestring), default=None,
constant=True, doc="""
Position along the y-axis in data coordinates""")


class Tap(PositionXY):
Expand Down
Loading

0 comments on commit 8396441

Please sign in to comment.