diff --git a/holoviews/core/spaces.py b/holoviews/core/spaces.py index a4f2210f5a..4e3a0439d9 100644 --- a/holoviews/core/spaces.py +++ b/holoviews/core/spaces.py @@ -436,6 +436,15 @@ class DynamicMap(HoloMap): simulation time across the layout). """) + streams = param.List(default=[], doc=""" + List of Stream instances to associate with the DynamicMap. The + set of parameter values across these streams will be supplied as + keyword arguments to the callback when the events are received, + updating the streams. + + Note that streams may only be used with callable callbacks (i.e + not generators).""" ) + cache_size = param.Integer(default=500, doc=""" The number of entries to cache for fast access. This is an LRU cache where the least recently used item is overwritten once @@ -455,6 +464,12 @@ class DynamicMap(HoloMap): def __init__(self, callback, initial_items=None, **params): super(DynamicMap, self).__init__(initial_items, callback=callback, **params) + + # Set source to self if not already specified + for stream in self.streams: + if stream.source is None: + stream.source = self + self.counter = 0 if self.callback is None: raise Exception("A suitable callback must be " @@ -541,8 +556,15 @@ def _execute_callback(self, *args): if self.call_mode == 'generator': retval = next(self.callback) else: - retval = self.callback(*args) - + # Additional validation needed to ensure kwargs don't clash + kwarg_items = [s.value.items() for s in self.streams] + flattened = [el for kws in kwarg_items for el in kws] + klist = [k for k,_ in flattened] + clashes = set([k for k in klist if klist.count(k) > 1]) + if clashes: + self.warning('Parameter name clashes for keys: %r' % clashes) + + retval = self.callback(*args, **dict(flattened)) if self.call_mode=='key': return self._style(retval) @@ -651,6 +673,8 @@ def __getitem__(self, key): # Cache lookup try: + if self.streams: + raise KeyError('Using streams disables DynamicMap cache') cache = super(DynamicMap,self).__getitem__(key) # Return selected cache items in a new DynamicMap if isinstance(cache, DynamicMap) and self.mode=='open': diff --git a/holoviews/streams.py b/holoviews/streams.py new file mode 100644 index 0000000000..6169c30453 --- /dev/null +++ b/holoviews/streams.py @@ -0,0 +1,225 @@ +""" +The streams module defines the streams API that allows visualizations to +generate and respond to events, originating either in Python on the +server-side or in Javascript in the Jupyter notebook (client-side). +""" + +import param +import uuid +from collections import OrderedDict +from .core import util + + +class Preprocessor(param.Parameterized): + """ + A Preprocessor is a callable that takes a dictionary as an argument + and returns a dictionary. Where possible, Preprocessors should have + valid reprs that can be evaluated. + + Preprocessors are used to set the value of a stream based on the + parameter values. They may be used for debugging purposes or to + remap or repack parameter values before they are passed onto to the + subscribers. + """ + + def __call__(self, params): + return params + + + +class Rename(Preprocessor): + """ + A preprocessor used to rename parameter values. + """ + + mapping = param.Dict(default={}, doc=""" + The mapping from the parameter names to the designated names""") + + def __init__(self, **mapping): + super(Rename, self).__init__(mapping=mapping) + + def __call__(self, params): + return {self.mapping.get(k,k):v for (k,v) in params.items()} + + def __repr__(self): + keywords = ','.join('%s=%r' % (k,v) for (k,v) in sorted(self.mapping.items())) + return 'Rename(%s)' % keywords + + + +class Group(Preprocessor): + """ + A preprocessor that keeps the parameter dictionary together, + supplying it as a value associated with the given key. + """ + + def __init__(self, key): + super(Group, self).__init__(key=key) + + def __call__(self, params): + return {self.key:params} + + def __repr__(self): + return 'Group(%r)' % self.key + + + +class Stream(param.Parameterized): + """ + A Stream is simply a parameterized object with parameters that + change over time in response to update events. Parameters are + updated via the update method. + + Streams may have one or more subscribers which are callables passed + the parameter dictionary when the trigger classmethod is called. + """ + + # Mapping from uuid to stream instance + registry = OrderedDict() + + @classmethod + def trigger(cls, streams): + """ + Given a list of streams, collect all the stream parameters into + a dictionary and pass it to the union set of subscribers. + + Passing multiple streams at once to trigger can be useful when a + subscriber may be set multiple times across streams but only + needs to be called once. + """ + # Union of stream values + items = [stream.value.items() for stream in streams] + union = [kv for kvs in items for kv in kvs] + klist = [k for k,_ in union] + clashes = set([k for k in klist if klist.count(k) > 1]) + if clashes: + param.main.warning('Parameter name clashes for keys: %r' % clashes) + + # Currently building a simple set of subscribers + groups = [stream.subscribers for stream in streams] + hidden = [stream._hidden_subscribers for stream in streams] + subscribers = util.unique_iterator([s for subscribers in groups+hidden + for s in subscribers]) + for subscriber in subscribers: + subscriber(dict(union)) + + + @classmethod + def find(cls, obj): + """ + Return a set of streams from the registry with a given source. + """ + return set(v for v in cls.registry.values() if v.source is obj) + + + def __init__(self, preprocessors=[], source=None, subscribers=[], **params): + """ + Mapping allows multiple streams with similar event state to be + used by remapping parameter names. + + Source is an optional argument specifying the HoloViews + datastructure that the stream receives events from, as supported + by the plotting backend. + """ + self.source = source + self.subscribers = subscribers + self.preprocessors = preprocessors + self._hidden_subscribers = [] + + self.uuid = uuid.uuid4().hex + super(Stream, self).__init__(**params) + self.registry[self.uuid] = self + + + @property + def value(self): + remapped = {k:v for k,v in self.get_param_values() if k!= 'name' } + for preprocessor in self.preprocessors: + remapped = preprocessor(remapped) + return remapped + + + def update(self, trigger=True, **kwargs): + """ + The update method updates the stream parameters in response to + some event. + + If trigger is enabled, the trigger classmethod is invoked on + this particular Stream instance. + """ + params = self.params().values() + constants = [p.constant for p in params] + for param in params: + param.constant = False + self.set_param(**kwargs) + for (param, const) in zip(params, constants): + param.constant = const + + if trigger: + self.trigger([self]) + + + def __repr__(self): + cls_name = self.__class__.__name__ + kwargs = ','.join('%s=%r' % (k,v) + for (k,v) in self.get_param_values() if k != 'name') + if not self.preprocessors: + return '%s(%s)' % (cls_name, kwargs) + else: + return '%s(%r, %s)' % (cls_name, self.preprocessors, kwargs) + + + def __str__(self): + return repr(self) + + +class PositionX(Stream): + """ + A position along the x-axis in data coordinates. + + With the appropriate plotting backend, this may correspond to the + position of the mouse/trackpad cursor. + """ + + x = param.Number(default=0, doc=""" + Position along the x-axis in data coordinates""", constant=True) + + def __init__(self, preprocessors=[], source=None, subscribers=[], **params): + super(PositionX, self).__init__(preprocessors=preprocessors, source=source, + subscribers=subscribers, **params) + + +class PositionY(Stream): + """ + A position along the y-axis in data coordinates. + + With the appropriate plotting backend, this may correspond to the + position of the mouse/trackpad cursor. + """ + + y = param.Number(default=0, doc=""" + Position along the y-axis in data coordinates""", constant=True) + + def __init__(self, preprocessors=[], source=None, subscribers=[], **params): + super(PositionY, self).__init__(preprocessors=preprocessors, source=source, + subscribers=subscribers, **params) + + +class PositionXY(Stream): + """ + A position along the x- and y-axes in data coordinates. + + With the appropriate plotting backend, this may correspond to the + position of the mouse/trackpad cursor. + """ + + + x = param.Number(default=0, doc=""" + Position along the x-axis in data coordinates""", constant=True) + + y = param.Number(default=0, doc=""" + Position along the y-axis in data coordinates""", constant=True) + + def __init__(self, preprocessors=[], source=None, subscribers=[], **params): + super(PositionXY, self).__init__(preprocessors=preprocessors, source=source, + subscribers=subscribers, **params) diff --git a/tests/teststreams.py b/tests/teststreams.py new file mode 100644 index 0000000000..f974d68fa4 --- /dev/null +++ b/tests/teststreams.py @@ -0,0 +1,110 @@ +""" +Unit test of the streams system +""" +from holoviews.element.comparison import ComparisonTestCase +from holoviews.streams import Stream, PositionX, PositionY, PositionXY +from holoviews.streams import Rename, Group + + +class TestSubscriber(object): + + def __init__(self): + self.call_count = 0 + self.kwargs = None + + def __call__(self, kwargs): + self.call_count += 1 + self.kwargs = kwargs + + +class TestPositionStreams(ComparisonTestCase): + + def test_positionX_init(self): + PositionX() + + def test_positionXY_init_values(self): + position = PositionXY(x=1, y=3) + self.assertEqual(position.value, dict(x=1, y=3)) + + def test_positionXY_update_values(self): + position = PositionXY() + position.update(x=5, y=10) + self.assertEqual(position.value, dict(x=5, y=10)) + + def test_positionY_const_parameter(self): + position = PositionY() + try: + position.y = 5 + raise Exception('No constant parameter exception') + except TypeError as e: + self.assertEqual(str(e), "Constant parameter 'y' cannot be modified") + + + +class TestSubscribers(ComparisonTestCase): + + def test_exception_subscriber(self): + subscriber = TestSubscriber() + position = PositionXY(subscribers=[subscriber]) + kwargs = dict(x=3, y=4) + position.update(**kwargs) + self.assertEqual(subscriber.kwargs, kwargs) + + def test_subscriber_disabled(self): + subscriber = TestSubscriber() + position = PositionXY(subscribers=[subscriber]) + kwargs = dict(x=3, y=4) + position.update(trigger=False, **kwargs) + self.assertEqual(subscriber.kwargs, None) + + + def test_subscribers(self): + subscriber1 = TestSubscriber() + subscriber2 = TestSubscriber() + position = PositionXY(subscribers=[subscriber1, subscriber2]) + kwargs = dict(x=3, y=4) + position.update(**kwargs) + self.assertEqual(subscriber1.kwargs, kwargs) + self.assertEqual(subscriber2.kwargs, kwargs) + + def test_batch_subscriber(self): + subscriber = TestSubscriber() + + positionX = PositionX(subscribers=[subscriber]) + positionY = PositionY(subscribers=[subscriber]) + + positionX.update(trigger=False, x=5) + positionY.update(trigger=False, y=10) + + Stream.trigger([positionX, positionY]) + self.assertEqual(subscriber.kwargs, dict(x=5, y=10)) + self.assertEqual(subscriber.call_count, 1) + + def test_batch_subscribers(self): + subscriber1 = TestSubscriber() + subscriber2 = TestSubscriber() + + positionX = PositionX(subscribers=[subscriber1, subscriber2]) + positionY = PositionY(subscribers=[subscriber1, subscriber2]) + + positionX.update(trigger=False, x=50) + positionY.update(trigger=False, y=100) + + Stream.trigger([positionX, positionY]) + + self.assertEqual(subscriber1.kwargs, dict(x=50, y=100)) + self.assertEqual(subscriber1.call_count, 1) + + self.assertEqual(subscriber2.kwargs, dict(x=50, y=100)) + self.assertEqual(subscriber2.call_count, 1) + + +class TestPreprocessors(ComparisonTestCase): + + def test_rename_preprocessor(self): + position = PositionXY([Rename(x='x1',y='y1')], x=1, y=3) + self.assertEqual(position.value, dict(x1=1, y1=3)) + + def test_group_preprocessor(self): + position = PositionXY([Group('mygroup')], x=1, y=3) + self.assertEqual(position.value, dict(mygroup={'x':1,'y':3}))