Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore widgets in batches to not exceed the zmq high water mark message limit in the kernel #2765

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions packages/jupyterlab-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
"@lumino/widgets": "^1.3.0",
"@types/backbone": "^1.4.1",
"jquery": "^3.1.1",
"p-map": "^3.0.0",
"p-throttle": "^3.1.0",
"semver": "^6.1.1"
},
"devDependencies": {
Expand Down
115 changes: 67 additions & 48 deletions packages/jupyterlab-manager/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import { DocumentRegistry } from '@jupyterlab/docregistry';

import { ISignal, Signal } from '@lumino/signaling';

import pMap from 'p-map';
import pThrottle from 'p-throttle';

import { valid } from 'semver';

import { SemVerCache } from './semvercache';
Expand Down Expand Up @@ -144,55 +147,71 @@ export abstract class LabWidgetManager extends ManagerBase<Widget>
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async comm_id => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
// Make sure the buffers are DataViews
const buffers = (msg.buffers || []).map(b => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
// For each comm id that we do not know about, create the comm, and
// request the state. We must do this processing in chunks to make sure we
// do not exceed the ZMQ high water mark limiting messages from the
// kernel. See https://github.com/voila-dashboards/voila/issues/534 for
// more details. We also throttle the function to respect the default
// iopub message rate limit in the notebook server.
const widgets_info = await pMap(
Object.keys(comm_ids),
pThrottle(
async (comm_id: string) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
});
msg_id = comm.send(
{
method: 'request_state'
},
this.callbacks(undefined)
);

return info.promise;
}
})
const comm = await this._create_comm(
this.comm_target_name,
comm_id
);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
// Make sure the buffers are DataViews
const buffers = (msg.buffers || []).map(b => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(
b instanceof ArrayBuffer ? b : b.buffer
);
}
});
put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state'
},
this.callbacks(undefined)
);

return info.promise;
}
},
4 /* calls */,
5 /* ms */
),
{ concurrency: 100 }
);

// We put in a synchronization barrier here so that we don't have to
Expand Down
4 changes: 3 additions & 1 deletion widgetsnbextension/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
"@jupyterlab/services": "^5.0.0-beta.2",
"@lumino/messaging": "^1.2.1",
"@lumino/widgets": "^1.3.0",
"backbone": "1.2.3"
"backbone": "1.2.3",
"p-map": "^3.0.0",
"p-throttle": "^3.1.0"
},
"devDependencies": {
"css-loader": "^3.4.0",
Expand Down
74 changes: 45 additions & 29 deletions widgetsnbextension/src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
'use strict';

var base = require('@jupyter-widgets/base');
var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase;
var baseManager = require('@jupyter-widgets/base-manager');
var widgets = require('@jupyter-widgets/controls');
var outputWidgets = require('./widget_output');
var saveState = require('./save_state');
var embedWidgets = require('./embed_widgets');
var pMap = require('p-map');
var pThrottle = require('p-throttle');

var MIME_TYPE = 'application/vnd.jupyter.widget-view+json';

Expand Down Expand Up @@ -74,7 +76,7 @@ function new_comm(
// WidgetManager class
//--------------------------------------------------------------------

export class WidgetManager extends ManagerBase {
export class WidgetManager extends baseManager.ManagerBase {
constructor(comm_manager, notebook) {
super();
// Managers are stored in *reverse* order, so that _managers[0] is the most recent.
Expand Down Expand Up @@ -111,33 +113,44 @@ export class WidgetManager extends ManagerBase {
// for the responses (2).
return Promise.all(comm_promises)
.then(function(comms) {
return Promise.all(
comms.map(function(comm) {
var update_promise = new Promise(function(resolve, reject) {
comm.on_msg(function(msg) {
base.put_buffers(
msg.content.data.state,
msg.content.data.buffer_paths,
msg.buffers
);
// A suspected response was received, check to see if
// it's a state update. If so, resolve.
if (msg.content.data.method === 'update') {
resolve({
comm: comm,
msg: msg
});
}
// We must do this in chunks to make sure we do not exceed the ZMQ
// high water mark limiting messages from the kernel. See
// https://github.com/voila-dashboards/voila/issues/534 for more
// details. We also throttle calls to respect the server's default
// iopub rate limit.
return pMap(
comms,
pThrottle(
function(comm) {
var update_promise = new Promise(function(resolve, reject) {
comm.on_msg(function(msg) {
base.put_buffers(
msg.content.data.state,
msg.content.data.buffer_paths,
msg.buffers
);
// A suspected response was received, check to see if
// it's a state update. If so, resolve.
if (msg.content.data.method === 'update') {
resolve({
comm: comm,
msg: msg
});
}
});
});
});
comm.send(
{
method: 'request_state'
},
that.callbacks()
);
return update_promise;
})
comm.send(
{
method: 'request_state'
},
that.callbacks()
);
return update_promise;
},
4 /* calls*/,
5 /* ms */
),
{ concurrency: 100 }
);
})
.then(function(widgets_info) {
Expand Down Expand Up @@ -411,7 +424,10 @@ export class WidgetManager extends ManagerBase {
* Callback handlers for a specific view
*/
callbacks(view) {
var callbacks = ManagerBase.prototype.callbacks.call(this, view);
var callbacks = baseManager.ManagerBase.prototype.callbacks.call(
this,
view
);
if (view && view.options.iopub_callbacks) {
callbacks.iopub = view.options.iopub_callbacks;
}
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7542,6 +7542,11 @@ p-reduce@^1.0.0:
resolved "https://registry.npmjs.org/p-reduce/-/p-reduce-1.0.0.tgz#18c2b0dd936a4690a529f8231f58a0fdb6a47dfa"
integrity sha1-GMKw3ZNqRpClKfgjH1ig/bakffo=

p-throttle@^3.1.0:
version "3.1.0"
resolved "https://registry.npmjs.org/p-throttle/-/p-throttle-3.1.0.tgz#dee34ce4e77d7cc2dfdc1fea0daedccc64147214"
integrity sha512-rLo81NXBihs3GJQhq89IXa0Egj/sbW1zW8/qnyadOwUhIUrZSUvyGdQ46ISRKELFBkVvmMJ4JUqWki4oAh30Qw==

p-try@^1.0.0:
version "1.0.0"
resolved "https://registry.npmjs.org/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"
Expand Down