Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduction of Streams API #832

Merged
merged 35 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
90a4260
Added Stream baseclass to streams.py
jlstevens Aug 25, 2016
95f13ac
Added MouseX, MouseY and MouseXY as concrete stream classes
jlstevens Aug 25, 2016
91cae3e
Added streams list parameter to DynamicMap
jlstevens Aug 25, 2016
b473a83
Setting stream sources to self in DynamicMap if not specified
jlstevens Aug 25, 2016
80ab335
Stream values now passed to callback as keyword arguments
jlstevens Aug 25, 2016
047b5a2
Disabling DynamicMap cache when streams are specified
jlstevens Aug 25, 2016
adec6db
Renamed Stream.callbacks to Stream.subscribers
jlstevens Aug 29, 2016
f4c5558
Added module docstring and Stream class docstring
jlstevens Aug 29, 2016
b061224
Added trigger argument and docstring to update method
jlstevens Aug 29, 2016
e01436c
Implemented trigger method
jlstevens Aug 29, 2016
f0a26bd
Added docstring to Streams trigger method
jlstevens Aug 29, 2016
88b4637
Renamed MouseX, MouseY, MouseXY to PositionX, PositionY, PositionXY
jlstevens Aug 29, 2016
97b0f9b
Added docstrings to Position streams
jlstevens Aug 29, 2016
eff7029
Added docstring to Position stream parameters
jlstevens Aug 29, 2016
5e62cda
Declared the Stream parameters as constant
jlstevens Aug 31, 2016
5c5c05b
The update method configured to set constant parameters
jlstevens Aug 31, 2016
10ba516
Fixed keyword union in trigger classmethod
jlstevens Aug 31, 2016
d726f00
Added support for stream preprocessors
jlstevens Aug 31, 2016
c0b95ed
Update method now correctly restores constant setting of parameters
jlstevens Aug 31, 2016
c5d1d56
Defined the Preprocessor class
jlstevens Aug 31, 2016
91198d7
Implemented the Rename preprocessor
jlstevens Aug 31, 2016
1230a46
Removed inbuilt parameter name mapping support of Streams
jlstevens Aug 31, 2016
5d318a3
Updated docstring
jlstevens Aug 31, 2016
3806f7f
Updated Streams __repr__ method
jlstevens Aug 31, 2016
12ef5c3
Added __repr__ method to Rename preprocessor
jlstevens Aug 31, 2016
252c4dd
Reordered Stream methods and spacing fixes
jlstevens Aug 31, 2016
4ce5f12
Defined the mapping parameter of the Rename preprocessor
jlstevens Aug 31, 2016
8d202b7
Added a Group preprocessor
jlstevens Aug 31, 2016
db3150f
Streams.trigger now warns when clashing keys are detected
jlstevens Aug 31, 2016
3e3964c
DynamicMap._execute_callback now warns when clashing keys are detected
jlstevens Aug 31, 2016
8053ecd
Setting stream sources in DynamicMap after setting the parameters
jlstevens Aug 31, 2016
20e6013
Fixed constructors of Position streams
jlstevens Aug 31, 2016
a1d2b87
Subscribers now triggered in a clearly defined order
jlstevens Aug 31, 2016
d6a7dec
Added 11 unit tests of core stream system
jlstevens Aug 31, 2016
846c9d7
Fixed util import for Python 3
jlstevens Aug 31, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions holoviews/core/spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,15 @@ class DynamicMap(HoloMap):
simulation time across the layout).
""")

streams = param.List(default=[], doc="""
List of Stream instances to associate with the DynamicMap. The
set of parameter values across these streams will be supplied as
keyword arguments to the callback when the events are received,
updating the streams.

Note that streams may only be used with callable callbacks (i.e
not generators).""" )

cache_size = param.Integer(default=500, doc="""
The number of entries to cache for fast access. This is an LRU
cache where the least recently used item is overwritten once
Expand All @@ -455,6 +464,12 @@ class DynamicMap(HoloMap):

def __init__(self, callback, initial_items=None, **params):
super(DynamicMap, self).__init__(initial_items, callback=callback, **params)

# Set source to self if not already specified
for stream in self.streams:
if stream.source is None:
stream.source = self

self.counter = 0
if self.callback is None:
raise Exception("A suitable callback must be "
Expand Down Expand Up @@ -541,8 +556,15 @@ def _execute_callback(self, *args):
if self.call_mode == 'generator':
retval = next(self.callback)
else:
retval = self.callback(*args)

# Additional validation needed to ensure kwargs don't clash
kwarg_items = [s.value.items() for s in self.streams]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below, no warning for clashing kwargs?

flattened = [el for kws in kwarg_items for el in kws]
klist = [k for k,_ in flattened]
clashes = set([k for k in klist if klist.count(k) > 1])
if clashes:
self.warning('Parameter name clashes for keys: %r' % clashes)

retval = self.callback(*args, **dict(flattened))
if self.call_mode=='key':
return self._style(retval)

Expand Down Expand Up @@ -651,6 +673,8 @@ def __getitem__(self, key):

# Cache lookup
try:
if self.streams:
raise KeyError('Using streams disables DynamicMap cache')
cache = super(DynamicMap,self).__getitem__(key)
# Return selected cache items in a new DynamicMap
if isinstance(cache, DynamicMap) and self.mode=='open':
Expand Down
220 changes: 220 additions & 0 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
"""
The streams module defines the streams API that allows visualizations to
generate and respond to events, originating either in Python on the
server-side or in Javascript in the Jupyter notebook (client-side).
"""

import param
import uuid
from collections import OrderedDict



class Preprocessor(param.Parameterized):
"""
A Preprocessor is a callable that takes a dictionary as an argument
and returns a dictionary. Where possible, Preprocessors should have
valid reprs that can be evaluated.

Preprocessors are used to set the value of a stream based on the
parameter values. They may be used for debugging purposes or to
remap or repack parameter values before they are passed onto to the
subscribers.
"""

def __call__(self, params):
return params



class Rename(Preprocessor):
"""
A preprocessor used to rename parameter values.
"""

mapping = param.Dict(default={}, doc="""
The mapping from the parameter names to the designated names""")

def __init__(self, **mapping):
super(Rename, self).__init__(mapping=mapping)

def __call__(self, params):
return {self.mapping.get(k,k):v for (k,v) in params.items()}

def __repr__(self):
keywords = ','.join('%s=%r' % (k,v) for (k,v) in sorted(self.mapping.items()))
return 'Rename(%s)' % keywords



class Group(Preprocessor):
"""
A preprocessor that keeps the parameter dictionary together,
supplying it as a value associated with the given key.
"""

def __init__(self, key):
super(Group, self).__init__(key=key)

def __call__(self, params):
return {self.key:params}

def __repr__(self):
return 'Group(%r)' % self.key



class Stream(param.Parameterized):
"""
A Stream is simply a parameterized object with parameters that
change over time in response to update events. Parameters are
updated via the update method.

Streams may have one or more subscribers which are callables passed
the parameter dictionary when the trigger classmethod is called.
"""

# Mapping from uuid to stream instance
registry = OrderedDict()

@classmethod
def trigger(cls, streams):
"""
Given a list of streams, collect all the stream parameters into
a dictionary and pass it to the union set of subscribers.

Passing multiple streams at once to trigger can be useful when a
subscriber may be set multiple times across streams but only
needs to be called once.
"""
# Union of stream values
items = [stream.value.items() for stream in streams]
union = [kv for kvs in items for kv in kvs]
klist = [k for k,_ in union]
clashes = set([k for k in klist if klist.count(k) > 1])
if clashes:
param.main.warning('Parameter name clashes for keys: %r' % clashes)

# Currently building a simple set of subscribers
groups = [stream.subscribers + stream._hidden_subscribers for stream in streams]
subscribers = set(s for subscribers in groups for s in subscribers)
for subscriber in subscribers:
Copy link
Member

@philippjfr philippjfr Aug 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, shouldn't some order be maintained, so that the subscribers defined on an individual stream are at least executed in sequence and the _hidden_subscribers are executed after all others?

Something like:

from .core import util

groups = [sub for stream in streams for sub in stream.subscribers]
hidden = [sub for stream in streams for sub in stream._hidden_subscribers]
for subscriber in util.unique_iterator(groups+hidden):
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense mainly because we know that _hidden_subscribers is the sort of thing we want to batch and execute last.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, currently it is not well defined when the plot redraw would happen.

subscriber(dict(union))


@classmethod
def find(cls, obj):
"""
Return a set of streams from the registry with a given source.
"""
return set(v for v in cls.registry.values() if v.source is obj)


def __init__(self, preprocessors=[], source=None, subscribers=[], **params):
"""
Mapping allows multiple streams with similar event state to be
used by remapping parameter names.

Source is an optional argument specifying the HoloViews
datastructure that the stream receives events from, as supported
by the plotting backend.
"""
self.source = source
self.subscribers = subscribers
self.preprocessors = preprocessors
self._hidden_subscribers = []

self.uuid = uuid.uuid4().hex
super(Stream, self).__init__(**params)
self.registry[self.uuid] = self


@property
def value(self):
remapped = {k:v for k,v in self.get_param_values() if k!= 'name' }
for preprocessor in self.preprocessors:
remapped = preprocessor(remapped)
return remapped


def update(self, trigger=True, **kwargs):
"""
The update method updates the stream parameters in response to
some event.

If trigger is enabled, the trigger classmethod is invoked on
this particular Stream instance.
"""
params = self.params().values()
constants = [p.constant for p in params]
for param in params:
param.constant = False
self.set_param(**kwargs)
for (param, const) in zip(params, constants):
param.constant = const

if trigger:
self.trigger([self])


def __repr__(self):
cls_name = self.__class__.__name__
kwargs = ','.join('%s=%r' % (k,v)
for (k,v) in self.get_param_values() if k != 'name')
if not self.preprocessors:
return '%s(%s)' % (cls_name, kwargs)
else:
return '%s(%r, %s)' % (cls_name, self.preprocessors, kwargs)


def __str__(self):
return repr(self)


class PositionX(Stream):
"""
A position along the x-axis in data coordinates.

With the appropriate plotting backend, this may correspond to the
position of the mouse/trackpad cursor.
"""

x = param.Number(default=0, doc="""
Position along the x-axis in data coordinates""", constant=True)

def __init__(self, mapping=None, **params):
super(PositionX, self).__init__(mapping=mapping, **params)


class PositionY(Stream):
"""
A position along the y-axis in data coordinates.

With the appropriate plotting backend, this may correspond to the
position of the mouse/trackpad cursor.
"""

y = param.Number(default=0, doc="""
Position along the y-axis in data coordinates""", constant=True)

def __init__(self, mapping=None, **params):
super(PositionY, self).__init__(mapping=mapping, **params)


class PositionXY(Stream):
"""
A position along the x- and y-axes in data coordinates.

With the appropriate plotting backend, this may correspond to the
position of the mouse/trackpad cursor.
"""


x = param.Number(default=0, doc="""
Position along the x-axis in data coordinates""", constant=True)

y = param.Number(default=0, doc="""
Position along the y-axis in data coordinates""", constant=True)

def __init__(self, mapping=None, **params):
super(PositionXY, self).__init__(mapping=mapping, **params)