Skip to content

Commit

Permalink
fix: throttle the rate AND number of request_state msg'es from the fr…
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenbreddels committed Feb 5, 2020
1 parent 2e2801d commit 050e1f1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
1 change: 1 addition & 0 deletions js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"css-loader": "^3.0.0",
"file-loader": "^4.0.0",
"npm-run-all": "^4.1.5",
"p-limit": "^2.2.2",
"style-loader": "^0.23.1",
"url-loader": "^1.0.0",
"webpack": "^4.29.3",
Expand Down
12 changes: 10 additions & 2 deletions js/src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import * as controls from '@jupyter-widgets/controls';
import * as PhosphorWidget from '@phosphor/widgets';

import { requireLoader } from './loader';
import { batchRateMap } from './utils';

if (typeof window !== "undefined" && typeof window.define !== "undefined") {
window.define("@jupyter-widgets/base", base);
Expand Down Expand Up @@ -117,10 +118,17 @@ export class WidgetManager extends JupyterLabManager {
async _build_models() {
const comm_ids = await this._get_comm_info();
const models = {};
const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => {
/**
* For the classical notebook, iopub_msg_rate_limit=1000 (default)
* And for zmq, we are affected by the default ZMQ_SNDHWM setting of 1000
* See https://github.com/voila-dashboards/voila/issues/534 for a discussion
*/
const maxMessagesInTransit = 100; // really save limit compared to ZMQ_SNDHWM
const maxMessagesPerSecond = 500; // lets be on the save side, in case the kernel sends more msg'es
const widgets_info = await Promise.all(batchRateMap(Object.keys(comm_ids), async (comm_id) => {
const comm = await this._create_comm(this.comm_target_name, comm_id);
return this._update_comm(comm);
}));
}, {room: maxMessagesInTransit, rate: maxMessagesPerSecond}));

await Promise.all(widgets_info.map(async (widget_info) => {
const state = widget_info.msg.content.data.state;
Expand Down
31 changes: 31 additions & 0 deletions js/src/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pLimit from 'p-limit';

const delay = (sec) => new Promise((resolve) => setTimeout(resolve, sec*1000));

/**
* Map a function onto a list where fn is being called at a limit of 'rate' number of calls per second.
* and 'room' number of parallel calls.
* Note that the minimum window at which rate is respected is room/rate seconds.
*/
export
const batchRateMap = (list, fn, {room, rate}) => {
var limit = pLimit(room);
return list.map(async (value) => {
return new Promise((valueResolve, reject) => {
limit(() => {
// We may not want to start the next job directly, we want to respect the
// throttling/rate, e.g.:
// If we have room for 10 parallel jobs, at a max rate of 100/second, each job
// should take at least 10/100=0.1 seconds.
// If we have room for 100 parallel jobs, and a max rate of 10/second, each
// job should take 100/10=10 seconds. But it will have a burst of 100 calls.
const throttlePromise = delay(room/rate);
// If the job is done, resolve the promise immediately, don't want for the throttle Promise
// This means that if we have room for 10 parallel jobs
// and just 9 jobs, we will never have to wait for the throttlePromise
const resultPromise = fn(value).then(valueResolve);
return Promise.all([resultPromise, throttlePromise]);
});
});
});
}

0 comments on commit 050e1f1

Please sign in to comment.