Skip to content

Commit

Permalink
Avoid tornado import (#5321)
Browse files Browse the repository at this point in the history
* Avoid tornado import

* Switch entirely to asyncio based processing

* Fix flakes

* Simplify

* Fix flakes

* Fix tests
  • Loading branch information
philippjfr authored May 26, 2022
1 parent 59c20c7 commit d9d998a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 111 deletions.
132 changes: 29 additions & 103 deletions holoviews/plotting/bokeh/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
from __future__ import absolute_import, division, unicode_literals

import asyncio
import time

from collections import defaultdict

import numpy as np
import panel as pn

from bokeh.models import (
CustomJS, FactorRange, DatetimeAxis, Range1d, DataRange1d,
PolyDrawTool, BoxEditTool, PolyEditTool, FreehandDrawTool,
PointDrawTool
)
from panel.io.state import state
from panel.io.model import hold
from tornado import gen

from ...core import OrderedDict
from ...core.options import CallbackError
Expand All @@ -29,12 +27,7 @@
BoxEdit, PointDraw, PolyDraw, PolyEdit, CDSStream, FreehandDraw,
CurveEdit, SelectionXY, Lasso, SelectMode
)
from .util import LooseVersion, bokeh_version, convert_timestamp

if bokeh_version >= LooseVersion('2.3.0'):
CUSTOM_TOOLTIP = 'description'
else:
CUSTOM_TOOLTIP = 'custom_tooltip'
from .util import convert_timestamp


class Callback(object):
Expand Down Expand Up @@ -90,11 +83,6 @@ class Callback(object):
received within the `throttle_timeout` duration.
"""

# Throttling configuration
adaptive_window = 3
throttle_timeout = 100
throttling_scheme = 'adaptive'

# Attributes to sync
attributes = {}

Expand Down Expand Up @@ -123,8 +111,6 @@ def __init__(self, plot, streams, source, **params):
self.reset()
self._active = False
self._prev_msg = None
self._last_event = time.time()
self._history = []

def _transform(self, msg):
for transform in self._transforms:
Expand Down Expand Up @@ -278,22 +264,7 @@ def _set_busy(self, busy):
with edit_readonly(state):
state.busy = busy

def _schedule_callback(self, cb, timeout=None, offset=True):
if timeout is None:
if self._history and self.throttling_scheme == 'adaptive':
timeout = int(np.array(self._history).mean()*1000)
else:
timeout = self.throttle_timeout
if self.throttling_scheme != 'debounce' and offset:
# Subtract the time taken since event started
diff = time.time()-self._last_event
timeout = max(timeout-(diff*1000), 50)
if not pn.state.curdoc:
cb()
else:
pn.state.curdoc.add_timeout_callback(cb, int(timeout))

def on_change(self, attr, old, new):
async def on_change(self, attr, old, new):
"""
Process change events adding timeout to process multiple concerted
value change at once rather than firing off multiple plot updates.
Expand All @@ -302,13 +273,9 @@ def on_change(self, attr, old, new):
if not self._active and self.plot.document:
self._active = True
self._set_busy(True)
if self.plot.renderer.mode == 'server':
self._schedule_callback(self.process_on_change, offset=False)
else:
with hold(self.plot.document):
self.process_on_change()
asyncio.create_task(self.process_on_change())

def on_event(self, event):
async def on_event(self, event):
"""
Process bokeh UIEvents adding timeout to process multiple concerted
value change at once rather than firing off multiple plot updates.
Expand All @@ -317,45 +284,18 @@ def on_event(self, event):
if not self._active and self.plot.document:
self._active = True
self._set_busy(True)
if self.plot.renderer.mode == 'server':
self._schedule_callback(self.process_on_event, offset=False)
else:
with hold(self.plot.document):
self.process_on_event()

def throttled(self):
now = time.time()
timeout = self.throttle_timeout/1000.
if self.throttling_scheme in ('throttle', 'adaptive'):
diff = (now-self._last_event)
if self._history and self.throttling_scheme == 'adaptive':
timeout = np.array(self._history).mean()
if diff < timeout:
return int((timeout-diff)*1000)
else:
prev_event = self._queue[-1][-1]
diff = (now-prev_event)
if diff < timeout:
return self.throttle_timeout
self._last_event = time.time()
return False

@gen.coroutine
def process_on_event_coroutine(self):
self.process_on_event()

def process_on_event(self):
asyncio.create_task(self.process_on_event())

async def process_on_event(self, timeout=None):
"""
Trigger callback change event and triggering corresponding streams.
"""
await asyncio.sleep(0.01)
if not self._queue:
self._active = False
self._set_busy(False)
return
throttled = self.throttled()
if throttled and pn.state.curdoc:
self._schedule_callback(self.process_on_event, throttled)
return

# Get unique event types in the queue
events = list(OrderedDict([(event.event_name, event)
for event, dt in self._queue]).values())
Expand All @@ -370,27 +310,15 @@ def process_on_event(self):
model_obj = self.plot_handles.get(self.models[0])
msg[attr] = self.resolve_attr_spec(path, event, model_obj)
self.on_msg(msg)
w = self.adaptive_window-1
diff = time.time()-self._last_event
self._history = self._history[-w:] + [diff]
if self.plot.renderer.mode == 'server':
self._schedule_callback(self.process_on_event)
else:
self._active = False
asyncio.create_task(self.process_on_event())

@gen.coroutine
def process_on_change_coroutine(self):
self.process_on_change()

def process_on_change(self):
async def process_on_change(self):
# Give on_change time to process new events
await asyncio.sleep(0.01)
if not self._queue:
self._active = False
self._set_busy(False)
return
throttled = self.throttled()
if throttled and pn.state.curdoc:
self._schedule_callback(self.process_on_change, throttled)
return
self._queue = []

msg = {}
Expand All @@ -414,29 +342,28 @@ def process_on_change(self):

if not equal or any(s.transient for s in self.streams):
self.on_msg(msg)
w = self.adaptive_window-1
diff = time.time()-self._last_event
self._history = self._history[-w:] + [diff]
self._prev_msg = msg

if self.plot.renderer.mode == 'server':
self._schedule_callback(self.process_on_change)
else:
self._active = False
asyncio.create_task(self.process_on_change())

def set_callback(self, handle):
"""
Set up on_change events for bokeh server interactions.
"""
if self.on_events:
event_handler = lambda event: (
asyncio.create_task(self.on_event(event))
)
for event in self.on_events:
handle.on_event(event, self.on_event)
handle.on_event(event, event_handler)
if self.on_changes:
change_handler = lambda attr, old, new: (
asyncio.create_task(self.on_change(attr, old, new))
)
for change in self.on_changes:
if change in ['patching', 'streaming']:
# Patch and stream events do not need handling on server
continue
handle.on_change(change, self.on_change)
handle.on_change(change, change_handler)

def initialize(self, plot_id=None):
handles = self._init_plot_handles()
Expand Down Expand Up @@ -468,7 +395,6 @@ def initialize(self, plot_id=None):
self._callbacks[cb_hash] = self



class PointerXYCallback(Callback):
"""
Returns the mouse x/y-position on mousemove event.
Expand Down Expand Up @@ -1051,7 +977,7 @@ def initialize(self, plot_id=None):
if stream.num_objects:
kwargs['num_objects'] = stream.num_objects
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip
if stream.styles:
self._create_style_callback(cds, glyph)
if stream.empty_value is not None:
Expand Down Expand Up @@ -1083,7 +1009,7 @@ def initialize(self, plot_id=None):
renderers = [renderer]
kwargs = {}
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip
point_tool = PointDrawTool(
add=False, drag=True, renderers=renderers, **kwargs
)
Expand Down Expand Up @@ -1130,7 +1056,7 @@ def initialize(self, plot_id=None):
if stream.styles:
self._create_style_callback(cds, glyph)
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip
if stream.empty_value is not None:
kwargs['empty_value'] = stream.empty_value
poly_tool = PolyDrawTool(
Expand Down Expand Up @@ -1178,7 +1104,7 @@ def initialize(self, plot_id=None):
self._create_style_callback(cds, glyph)
kwargs = {}
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip
if stream.empty_value is not None:
kwargs['empty_value'] = stream.empty_value
poly_tool = FreehandDrawTool(
Expand Down Expand Up @@ -1233,7 +1159,7 @@ def initialize(self, plot_id=None):
if stream.num_objects:
kwargs['num_objects'] = stream.num_objects
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip

renderer = self.plot.handles['glyph_renderer']
if isinstance(self.plot, PathPlot):
Expand Down Expand Up @@ -1279,7 +1205,7 @@ def initialize(self, plot_id=None):
stream = self.streams[0]
kwargs = {}
if stream.tooltip:
kwargs[CUSTOM_TOOLTIP] = stream.tooltip
kwargs['description'] = stream.tooltip
if vertex_tool is None:
vertex_style = dict({'size': 10}, **stream.vertex_style)
r1 = plot.state.scatter([], [], **vertex_style)
Expand Down
12 changes: 4 additions & 8 deletions holoviews/tests/plotting/bokeh/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ def test_point_draw_callback_initialized_server(self):
points = Points([(0, 1)])
PointDraw(source=points)
plot = bokeh_server_renderer.get_plot(points)
self.assertEqual(plot.handles['source']._callbacks,
{'data': [plot.callbacks[0].on_change]})
assert 'data' in plot.handles['source']._callbacks

def test_point_draw_callback_with_vdims_initialization(self):
points = Points([(0, 1, 'A')], vdims=['A'])
Expand Down Expand Up @@ -227,8 +226,7 @@ def test_poly_draw_callback_initialized_server(self):
polys = Polygons([[(0, 0), (2, 2), (4, 0)]])
PolyDraw(source=polys)
plot = bokeh_server_renderer.get_plot(polys)
self.assertEqual(plot.handles['source']._callbacks,
{'data': [plot.callbacks[0].on_change]})
assert 'data' in plot.handles['source']._callbacks

def test_poly_draw_callback_with_vdims(self):
polys = Polygons([{'x': [0, 2, 4], 'y': [0, 2, 0], 'A': 1}], vdims=['A'])
Expand Down Expand Up @@ -290,8 +288,7 @@ def test_box_edit_callback_initialized_server(self):
boxes = Polygons([Box(0, 0, 1)])
BoxEdit(source=boxes)
plot = bokeh_server_renderer.get_plot(boxes)
self.assertEqual(plot.handles['cds']._callbacks,
{'data': [plot.callbacks[0].on_change]})
assert 'data' in plot.handles['cds']._callbacks

def test_poly_edit_callback(self):
polys = Polygons([[(0, 0), (2, 2), (4, 0)]])
Expand All @@ -308,8 +305,7 @@ def test_poly_edit_callback_initialized_server(self):
polys = Polygons([[(0, 0), (2, 2), (4, 0)]])
PolyEdit(source=polys)
plot = bokeh_server_renderer.get_plot(polys)
self.assertEqual(plot.handles['source']._callbacks,
{'data': [plot.callbacks[0].on_change]})
assert 'data' in plot.handles['source']._callbacks

def test_poly_edit_shared_callback(self):
polys = Polygons([[(0, 0), (2, 2), (4, 0)]])
Expand Down

0 comments on commit d9d998a

Please sign in to comment.