From a507eae228b4a989972f3bf767392295245cfd7c Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 03:57:14 -0800 Subject: [PATCH 01/10] Restore widgets in batches to not exceed the zmq high water mark message limit in the kernel. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Current ZMQ by default limits the kernel’s iopub message send queue to at most 1000 messages (and the real limit can be much lower, see ZMQ_SNDHWM at http://api.zeromq.org/4-3:zmq-setsockopt). We now request comm state in batches to avoid this limit. See https://github.com/voila-dashboards/voila/issues/534 for more details, including where this was causing real problems. --- packages/base-manager/src/utils.ts | 19 +++++++ packages/jupyterlab-manager/src/manager.ts | 16 ++++-- widgetsnbextension/src/manager.js | 61 +++++++++++----------- 3 files changed, 61 insertions(+), 35 deletions(-) diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index 0e0c7f59ba..c49f3fc369 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -298,3 +298,22 @@ export function bufferToBase64(buffer: ArrayBuffer): string { export function base64ToBuffer(base64: string): ArrayBuffer { return toByteArray(base64).buffer; } + +/** + * Map a function onto a list in batches, resolving each batch of returned + * promises before moving to the next batch. + */ +export async function mapBatch( + list: T[], + step: number, + fn: (value: T, index: number, array: T[]) => Promise | U, + thisArg?: any +): Promise { + const results = []; + for (let i = 0; i < list.length; i += step) { + results.push( + ...(await Promise.all(list.slice(i, i + step).map(fn, thisArg))) + ); + } + return results; +} diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index c1b306339a..aaa363c63f 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -19,7 +19,8 @@ import { import { ManagerBase, serialize_state, - IStateOptions + IStateOptions, + mapBatch } from '@jupyter-widgets/base-manager'; import { IDisposable } from '@lumino/disposable'; @@ -144,9 +145,14 @@ export abstract class LabWidgetManager extends ManagerBase } const comm_ids = await this._get_comm_info(); - // For each comm id that we do not know about, create the comm, and request the state. - const widgets_info = await Promise.all( - Object.keys(comm_ids).map(async comm_id => { + // For each comm id that we do not know about, create the comm, and + // request the state. We must do this in batches to make sure we do not + // exceed the ZMQ high water mark limiting messages from the kernel. See + // https://github.com/voila-dashboards/voila/issues/534 for more details. + const widgets_info = await mapBatch( + Object.keys(comm_ids), + 100, + async comm_id => { try { await this.get_model(comm_id); // If we successfully get the model, do no more. @@ -192,7 +198,7 @@ export abstract class LabWidgetManager extends ManagerBase return info.promise; } - }) + } ); // We put in a synchronization barrier here so that we don't have to diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 78dd66eb49..4d1aef0e76 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -3,7 +3,7 @@ 'use strict'; var base = require('@jupyter-widgets/base'); -var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase; +var baseManager = require('@jupyter-widgets/base-manager'); var widgets = require('@jupyter-widgets/controls'); var outputWidgets = require('./widget_output'); var saveState = require('./save_state'); @@ -74,7 +74,7 @@ function new_comm( // WidgetManager class //-------------------------------------------------------------------- -export class WidgetManager extends ManagerBase { +export class WidgetManager extends baseManager.ManagerBase { constructor(comm_manager, notebook) { super(); // Managers are stored in *reverse* order, so that _managers[0] is the most recent. @@ -111,34 +111,32 @@ export class WidgetManager extends ManagerBase { // for the responses (2). return Promise.all(comm_promises) .then(function(comms) { - return Promise.all( - comms.map(function(comm) { - var update_promise = new Promise(function(resolve, reject) { - comm.on_msg(function(msg) { - base.put_buffers( - msg.content.data.state, - msg.content.data.buffer_paths, - msg.buffers - ); - // A suspected response was received, check to see if - // it's a state update. If so, resolve. - if (msg.content.data.method === 'update') { - resolve({ - comm: comm, - msg: msg - }); - } - }); + return baseManager.mapBatch(comms, 100, function(comm) { + var update_promise = new Promise(function(resolve, reject) { + comm.on_msg(function(msg) { + base.put_buffers( + msg.content.data.state, + msg.content.data.buffer_paths, + msg.buffers + ); + // A suspected response was received, check to see if + // it's a state update. If so, resolve. + if (msg.content.data.method === 'update') { + resolve({ + comm: comm, + msg: msg + }); + } }); - comm.send( - { - method: 'request_state' - }, - that.callbacks() - ); - return update_promise; - }) - ); + }); + comm.send( + { + method: 'request_state' + }, + that.callbacks() + ); + return update_promise; + }); }) .then(function(widgets_info) { return Promise.all( @@ -411,7 +409,10 @@ export class WidgetManager extends ManagerBase { * Callback handlers for a specific view */ callbacks(view) { - var callbacks = ManagerBase.prototype.callbacks.call(this, view); + var callbacks = baseManager.ManagerBase.prototype.callbacks.call( + this, + view + ); if (view && view.options.iopub_callbacks) { callbacks.iopub = view.options.iopub_callbacks; } From e744957ed5108b1785082cd0b1b51b863c4f195a Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 04:20:37 -0800 Subject: [PATCH 02/10] Add explanatory comment to classic widget manager --- widgetsnbextension/src/manager.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 4d1aef0e76..c4a0c7bf74 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -111,6 +111,9 @@ export class WidgetManager extends baseManager.ManagerBase { // for the responses (2). return Promise.all(comm_promises) .then(function(comms) { + // We must do this in batches to make sure we do not + // exceed the ZMQ high water mark limiting messages from the kernel. See + // https://github.com/voila-dashboards/voila/issues/534 for more details. return baseManager.mapBatch(comms, 100, function(comm) { var update_promise = new Promise(function(resolve, reject) { comm.on_msg(function(msg) { From 24cf363770cbea59d876d862f0535f82b00b0708 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 09:00:39 -0800 Subject: [PATCH 03/10] Introduce concurrency and tweak interface. --- packages/base-manager/src/utils.ts | 68 +++++++++++++++++++--- packages/jupyterlab-manager/src/manager.ts | 15 ++--- widgetsnbextension/src/manager.js | 56 +++++++++--------- 3 files changed, 98 insertions(+), 41 deletions(-) diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index c49f3fc369..a8bb34a1e1 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -300,20 +300,72 @@ export function base64ToBuffer(base64: string): ArrayBuffer { } /** - * Map a function onto a list in batches, resolving each batch of returned - * promises before moving to the next batch. + * A map that chunks the list into chunkSize pieces and evaluates chunks + * concurrently. + * + * @param list - The list to map over + * @param chunkOptions - The options for chunking and evaluating. + * @param fn - The function to map, with the same arguments as an array map + * @param thisArg - An optional thisArg for the function + * @param chunkSize - The maximum size of each chunk, default to 1 + * @param concurrency - The maximum number of chunks to evaluate concurently, + * default to 1 + * + * @returns - the equivalent of `Promise.all(list.map(fn, thisArg))` */ -export async function mapBatch( +export async function chunkMap( list: T[], - step: number, + chunkOptions: chunkMap.IOptions, fn: (value: T, index: number, array: T[]) => Promise | U, thisArg?: any ): Promise { - const results = []; - for (let i = 0; i < list.length; i += step) { - results.push( - ...(await Promise.all(list.slice(i, i + step).map(fn, thisArg))) + // Default to equivalent to Promise.all(list.map(fn, thisarg)) + const chunkSize = chunkOptions.chunkSize ?? list.length; + const concurrency = chunkOptions.concurrency ?? 1; + + const results = new Array(list.length); + const chunks: (() => Promise)[] = []; + + // Process a single chunk and resolve to the next chunk if available + async function processChunk(chunk: any[], start: number): Promise { + const chunkResult = await Promise.all( + chunk.map((v, i) => fn.call(thisArg, v, start + i, list)) ); + + // Splice the chunk results into the results array. We use + // chunkResult.length because the last chunk may not be full size. + results.splice(start, chunkResult.length, ...chunkResult); + + // Start the next work item by processing it + if (chunks.length > 0) { + return chunks.shift()!(); + } + } + + // Make closures for each batch of work. + for (let i = 0; i < list.length; i += chunkSize) { + chunks.push(() => processChunk(list.slice(i, i + chunkSize), i)); } + + // Start the first concurrent chunks. Each chunk will automatically start + // the next available chunk when it finishes. + await Promise.all(chunks.splice(0, concurrency).map(f => f())); return results; } + +export namespace chunkMap { + /** + * The options for chunking and evaluating. + */ + export interface IOptions { + /** + * The maximum size of a chunk. Defaults to the list size. + */ + chunkSize?: number; + + /** + * The maximum number of chunks to evaluate simultaneously. Defaults to 1. + */ + concurrency?: number; + } +} diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index aaa363c63f..629e8ab5e0 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -20,7 +20,7 @@ import { ManagerBase, serialize_state, IStateOptions, - mapBatch + chunkMap } from '@jupyter-widgets/base-manager'; import { IDisposable } from '@lumino/disposable'; @@ -146,13 +146,14 @@ export abstract class LabWidgetManager extends ManagerBase const comm_ids = await this._get_comm_info(); // For each comm id that we do not know about, create the comm, and - // request the state. We must do this in batches to make sure we do not - // exceed the ZMQ high water mark limiting messages from the kernel. See - // https://github.com/voila-dashboards/voila/issues/534 for more details. - const widgets_info = await mapBatch( + // request the state. We must do this processing in chunksto make sure we + // do not exceed the ZMQ high water mark limiting messages from the + // kernel. See https://github.com/voila-dashboards/voila/issues/534 for + // more details. + const widgets_info = await chunkMap( Object.keys(comm_ids), - 100, - async comm_id => { + { chunkSize: 10, concurrency: 10 }, + async (comm_id: string) => { try { await this.get_model(comm_id); // If we successfully get the model, do no more. diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index c4a0c7bf74..d352c72e59 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -111,35 +111,39 @@ export class WidgetManager extends baseManager.ManagerBase { // for the responses (2). return Promise.all(comm_promises) .then(function(comms) { - // We must do this in batches to make sure we do not + // We must do this in chunks to make sure we do not // exceed the ZMQ high water mark limiting messages from the kernel. See // https://github.com/voila-dashboards/voila/issues/534 for more details. - return baseManager.mapBatch(comms, 100, function(comm) { - var update_promise = new Promise(function(resolve, reject) { - comm.on_msg(function(msg) { - base.put_buffers( - msg.content.data.state, - msg.content.data.buffer_paths, - msg.buffers - ); - // A suspected response was received, check to see if - // it's a state update. If so, resolve. - if (msg.content.data.method === 'update') { - resolve({ - comm: comm, - msg: msg - }); - } + return baseManager.chunkMap( + comms, + { chunkSize: 10, concurrency: 10 }, + function(comm) { + var update_promise = new Promise(function(resolve, reject) { + comm.on_msg(function(msg) { + base.put_buffers( + msg.content.data.state, + msg.content.data.buffer_paths, + msg.buffers + ); + // A suspected response was received, check to see if + // it's a state update. If so, resolve. + if (msg.content.data.method === 'update') { + resolve({ + comm: comm, + msg: msg + }); + } + }); }); - }); - comm.send( - { - method: 'request_state' - }, - that.callbacks() - ); - return update_promise; - }); + comm.send( + { + method: 'request_state' + }, + that.callbacks() + ); + return update_promise; + } + ); }) .then(function(widgets_info) { return Promise.all( From 978fbceed379929a8ee4e65de5f5b6a4e5a797cc Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 09:11:11 -0800 Subject: [PATCH 04/10] Clean up documentation --- packages/base-manager/src/utils.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index a8bb34a1e1..7876ff9caa 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -307,10 +307,6 @@ export function base64ToBuffer(base64: string): ArrayBuffer { * @param chunkOptions - The options for chunking and evaluating. * @param fn - The function to map, with the same arguments as an array map * @param thisArg - An optional thisArg for the function - * @param chunkSize - The maximum size of each chunk, default to 1 - * @param concurrency - The maximum number of chunks to evaluate concurently, - * default to 1 - * * @returns - the equivalent of `Promise.all(list.map(fn, thisArg))` */ export async function chunkMap( From 08493db5da7b122065224494557175cf86993558 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 10:23:03 -0800 Subject: [PATCH 05/10] bump chunk size --- packages/jupyterlab-manager/src/manager.ts | 2 +- widgetsnbextension/src/manager.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index 629e8ab5e0..332968d76e 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -152,7 +152,7 @@ export abstract class LabWidgetManager extends ManagerBase // more details. const widgets_info = await chunkMap( Object.keys(comm_ids), - { chunkSize: 10, concurrency: 10 }, + { chunkSize: 20, concurrency: 5 }, async (comm_id: string) => { try { await this.get_model(comm_id); diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index d352c72e59..3a8f7a5499 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -116,7 +116,7 @@ export class WidgetManager extends baseManager.ManagerBase { // https://github.com/voila-dashboards/voila/issues/534 for more details. return baseManager.chunkMap( comms, - { chunkSize: 10, concurrency: 10 }, + { chunkSize: 20, concurrency: 5 }, function(comm) { var update_promise = new Promise(function(resolve, reject) { comm.on_msg(function(msg) { From f4abd92bdf881076f494e24a23eff67ef3f0d372 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 10:23:28 -0800 Subject: [PATCH 06/10] Stop chunk map on error --- packages/base-manager/src/utils.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index 7876ff9caa..cda031b228 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -321,6 +321,7 @@ export async function chunkMap( const results = new Array(list.length); const chunks: (() => Promise)[] = []; + let stop = false; // Process a single chunk and resolve to the next chunk if available async function processChunk(chunk: any[], start: number): Promise { @@ -333,7 +334,7 @@ export async function chunkMap( results.splice(start, chunkResult.length, ...chunkResult); // Start the next work item by processing it - if (chunks.length > 0) { + if (chunks.length > 0 && !stop) { return chunks.shift()!(); } } @@ -345,7 +346,13 @@ export async function chunkMap( // Start the first concurrent chunks. Each chunk will automatically start // the next available chunk when it finishes. - await Promise.all(chunks.splice(0, concurrency).map(f => f())); + try { + await Promise.all(chunks.splice(0, concurrency).map(f => f())); + } catch (e) { + // Flag that there is an error to stop all other processing. + stop = true; + throw e; + } return results; } From 32c915c6624f86f711baf2d3ec789ecd93c90465 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 10:35:48 -0800 Subject: [PATCH 07/10] =?UTF-8?q?Let=E2=80=99s=20just=20use=20p-map.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If there is a problem, we can revert back to our implementation. --- packages/base-manager/src/utils.ts | 74 ---------------------- packages/jupyterlab-manager/package.json | 1 + packages/jupyterlab-manager/src/manager.ts | 10 +-- widgetsnbextension/package.json | 3 +- widgetsnbextension/src/manager.js | 7 +- 5 files changed, 13 insertions(+), 82 deletions(-) diff --git a/packages/base-manager/src/utils.ts b/packages/base-manager/src/utils.ts index cda031b228..0e0c7f59ba 100644 --- a/packages/base-manager/src/utils.ts +++ b/packages/base-manager/src/utils.ts @@ -298,77 +298,3 @@ export function bufferToBase64(buffer: ArrayBuffer): string { export function base64ToBuffer(base64: string): ArrayBuffer { return toByteArray(base64).buffer; } - -/** - * A map that chunks the list into chunkSize pieces and evaluates chunks - * concurrently. - * - * @param list - The list to map over - * @param chunkOptions - The options for chunking and evaluating. - * @param fn - The function to map, with the same arguments as an array map - * @param thisArg - An optional thisArg for the function - * @returns - the equivalent of `Promise.all(list.map(fn, thisArg))` - */ -export async function chunkMap( - list: T[], - chunkOptions: chunkMap.IOptions, - fn: (value: T, index: number, array: T[]) => Promise | U, - thisArg?: any -): Promise { - // Default to equivalent to Promise.all(list.map(fn, thisarg)) - const chunkSize = chunkOptions.chunkSize ?? list.length; - const concurrency = chunkOptions.concurrency ?? 1; - - const results = new Array(list.length); - const chunks: (() => Promise)[] = []; - let stop = false; - - // Process a single chunk and resolve to the next chunk if available - async function processChunk(chunk: any[], start: number): Promise { - const chunkResult = await Promise.all( - chunk.map((v, i) => fn.call(thisArg, v, start + i, list)) - ); - - // Splice the chunk results into the results array. We use - // chunkResult.length because the last chunk may not be full size. - results.splice(start, chunkResult.length, ...chunkResult); - - // Start the next work item by processing it - if (chunks.length > 0 && !stop) { - return chunks.shift()!(); - } - } - - // Make closures for each batch of work. - for (let i = 0; i < list.length; i += chunkSize) { - chunks.push(() => processChunk(list.slice(i, i + chunkSize), i)); - } - - // Start the first concurrent chunks. Each chunk will automatically start - // the next available chunk when it finishes. - try { - await Promise.all(chunks.splice(0, concurrency).map(f => f())); - } catch (e) { - // Flag that there is an error to stop all other processing. - stop = true; - throw e; - } - return results; -} - -export namespace chunkMap { - /** - * The options for chunking and evaluating. - */ - export interface IOptions { - /** - * The maximum size of a chunk. Defaults to the list size. - */ - chunkSize?: number; - - /** - * The maximum number of chunks to evaluate simultaneously. Defaults to 1. - */ - concurrency?: number; - } -} diff --git a/packages/jupyterlab-manager/package.json b/packages/jupyterlab-manager/package.json index 709959b3bc..55b899a2df 100644 --- a/packages/jupyterlab-manager/package.json +++ b/packages/jupyterlab-manager/package.json @@ -58,6 +58,7 @@ "@lumino/widgets": "^1.3.0", "@types/backbone": "^1.4.1", "jquery": "^3.1.1", + "p-map": "^3.0.0", "semver": "^6.1.1" }, "devDependencies": { diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index 332968d76e..0737fe7810 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -39,6 +39,8 @@ import { DocumentRegistry } from '@jupyterlab/docregistry'; import { ISignal, Signal } from '@lumino/signaling'; +import pMap from 'p-map'; + import { valid } from 'semver'; import { SemVerCache } from './semvercache'; @@ -146,13 +148,12 @@ export abstract class LabWidgetManager extends ManagerBase const comm_ids = await this._get_comm_info(); // For each comm id that we do not know about, create the comm, and - // request the state. We must do this processing in chunksto make sure we + // request the state. We must do this processing in chunks to make sure we // do not exceed the ZMQ high water mark limiting messages from the // kernel. See https://github.com/voila-dashboards/voila/issues/534 for // more details. - const widgets_info = await chunkMap( + const widgets_info = await pMap( Object.keys(comm_ids), - { chunkSize: 20, concurrency: 5 }, async (comm_id: string) => { try { await this.get_model(comm_id); @@ -199,7 +200,8 @@ export abstract class LabWidgetManager extends ManagerBase return info.promise; } - } + }, + { concurrency: 100 } ); // We put in a synchronization barrier here so that we don't have to diff --git a/widgetsnbextension/package.json b/widgetsnbextension/package.json index c8164d5281..6c39a1ce80 100644 --- a/widgetsnbextension/package.json +++ b/widgetsnbextension/package.json @@ -29,7 +29,8 @@ "@jupyterlab/services": "^5.0.0-beta.2", "@lumino/messaging": "^1.2.1", "@lumino/widgets": "^1.3.0", - "backbone": "1.2.3" + "backbone": "1.2.3", + "p-map": "^3.0.0" }, "devDependencies": { "css-loader": "^3.4.0", diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 3a8f7a5499..31554d79f6 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -8,6 +8,7 @@ var widgets = require('@jupyter-widgets/controls'); var outputWidgets = require('./widget_output'); var saveState = require('./save_state'); var embedWidgets = require('./embed_widgets'); +var pMap = require('p-map'); var MIME_TYPE = 'application/vnd.jupyter.widget-view+json'; @@ -114,9 +115,8 @@ export class WidgetManager extends baseManager.ManagerBase { // We must do this in chunks to make sure we do not // exceed the ZMQ high water mark limiting messages from the kernel. See // https://github.com/voila-dashboards/voila/issues/534 for more details. - return baseManager.chunkMap( + return pMap( comms, - { chunkSize: 20, concurrency: 5 }, function(comm) { var update_promise = new Promise(function(resolve, reject) { comm.on_msg(function(msg) { @@ -142,7 +142,8 @@ export class WidgetManager extends baseManager.ManagerBase { that.callbacks() ); return update_promise; - } + }, + { concurrency: 100 } ); }) .then(function(widgets_info) { From 6aadcf7299a0bfae421eb27b1ca278b62c356421 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 11:01:03 -0800 Subject: [PATCH 08/10] Add throttling to respect the server default iopub rate limit of 1 msg/ms. --- packages/jupyterlab-manager/package.json | 1 + packages/jupyterlab-manager/src/manager.ts | 105 ++++++++++++--------- widgetsnbextension/src/manager.js | 63 +++++++------ yarn.lock | 5 + 4 files changed, 99 insertions(+), 75 deletions(-) diff --git a/packages/jupyterlab-manager/package.json b/packages/jupyterlab-manager/package.json index 55b899a2df..2b8435ffe8 100644 --- a/packages/jupyterlab-manager/package.json +++ b/packages/jupyterlab-manager/package.json @@ -59,6 +59,7 @@ "@types/backbone": "^1.4.1", "jquery": "^3.1.1", "p-map": "^3.0.0", + "p-throttle": "^3.1.0", "semver": "^6.1.1" }, "devDependencies": { diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index 0737fe7810..b51cde085a 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -40,6 +40,7 @@ import { DocumentRegistry } from '@jupyterlab/docregistry'; import { ISignal, Signal } from '@lumino/signaling'; import pMap from 'p-map'; +import pThrottle from 'p-throttle'; import { valid } from 'semver'; @@ -151,56 +152,66 @@ export abstract class LabWidgetManager extends ManagerBase // request the state. We must do this processing in chunks to make sure we // do not exceed the ZMQ high water mark limiting messages from the // kernel. See https://github.com/voila-dashboards/voila/issues/534 for - // more details. + // more details. We also throttle the function to respect the default + // iopub message rate limit in the notebook server. const widgets_info = await pMap( Object.keys(comm_ids), - async (comm_id: string) => { - try { - await this.get_model(comm_id); - // If we successfully get the model, do no more. - return; - } catch (e) { - // If we have the widget model not found error, then we can create the - // widget. Otherwise, rethrow the error. We have to check the error - // message text explicitly because the get_model function in this - // class throws a generic error with this specific text. - if (e.message !== 'widget model not found') { - throw e; - } - const comm = await this._create_comm(this.comm_target_name, comm_id); - - let msg_id = ''; - const info = new PromiseDelegate(); - comm.on_msg((msg: KernelMessage.ICommMsgMsg) => { - if ( - (msg.parent_header as any).msg_id === msg_id && - msg.header.msg_type === 'comm_msg' && - msg.content.data.method === 'update' - ) { - const data = msg.content.data as any; - const buffer_paths = data.buffer_paths || []; - // Make sure the buffers are DataViews - const buffers = (msg.buffers || []).map(b => { - if (b instanceof DataView) { - return b; - } else { - return new DataView(b instanceof ArrayBuffer ? b : b.buffer); - } - }); - put_buffers(data.state, buffer_paths, buffers); - info.resolve({ comm, msg }); + pThrottle( + async (comm_id: string) => { + try { + await this.get_model(comm_id); + // If we successfully get the model, do no more. + return; + } catch (e) { + // If we have the widget model not found error, then we can create the + // widget. Otherwise, rethrow the error. We have to check the error + // message text explicitly because the get_model function in this + // class throws a generic error with this specific text. + if (e.message !== 'widget model not found') { + throw e; } - }); - msg_id = comm.send( - { - method: 'request_state' - }, - this.callbacks(undefined) - ); - - return info.promise; - } - }, + const comm = await this._create_comm( + this.comm_target_name, + comm_id + ); + + let msg_id = ''; + const info = new PromiseDelegate(); + comm.on_msg((msg: KernelMessage.ICommMsgMsg) => { + if ( + (msg.parent_header as any).msg_id === msg_id && + msg.header.msg_type === 'comm_msg' && + msg.content.data.method === 'update' + ) { + const data = msg.content.data as any; + const buffer_paths = data.buffer_paths || []; + // Make sure the buffers are DataViews + const buffers = (msg.buffers || []).map(b => { + if (b instanceof DataView) { + return b; + } else { + return new DataView( + b instanceof ArrayBuffer ? b : b.buffer + ); + } + }); + put_buffers(data.state, buffer_paths, buffers); + info.resolve({ comm, msg }); + } + }); + msg_id = comm.send( + { + method: 'request_state' + }, + this.callbacks(undefined) + ); + + return info.promise; + } + }, + 4 /* calls */, + 5 /* ms */ + ), { concurrency: 100 } ); diff --git a/widgetsnbextension/src/manager.js b/widgetsnbextension/src/manager.js index 31554d79f6..a0b93b595b 100644 --- a/widgetsnbextension/src/manager.js +++ b/widgetsnbextension/src/manager.js @@ -9,6 +9,7 @@ var outputWidgets = require('./widget_output'); var saveState = require('./save_state'); var embedWidgets = require('./embed_widgets'); var pMap = require('p-map'); +var pThrottle = require('p-throttle'); var MIME_TYPE = 'application/vnd.jupyter.widget-view+json'; @@ -112,37 +113,43 @@ export class WidgetManager extends baseManager.ManagerBase { // for the responses (2). return Promise.all(comm_promises) .then(function(comms) { - // We must do this in chunks to make sure we do not - // exceed the ZMQ high water mark limiting messages from the kernel. See - // https://github.com/voila-dashboards/voila/issues/534 for more details. + // We must do this in chunks to make sure we do not exceed the ZMQ + // high water mark limiting messages from the kernel. See + // https://github.com/voila-dashboards/voila/issues/534 for more + // details. We also throttle calls to respect the server's default + // iopub rate limit. return pMap( comms, - function(comm) { - var update_promise = new Promise(function(resolve, reject) { - comm.on_msg(function(msg) { - base.put_buffers( - msg.content.data.state, - msg.content.data.buffer_paths, - msg.buffers - ); - // A suspected response was received, check to see if - // it's a state update. If so, resolve. - if (msg.content.data.method === 'update') { - resolve({ - comm: comm, - msg: msg - }); - } + pThrottle( + function(comm) { + var update_promise = new Promise(function(resolve, reject) { + comm.on_msg(function(msg) { + base.put_buffers( + msg.content.data.state, + msg.content.data.buffer_paths, + msg.buffers + ); + // A suspected response was received, check to see if + // it's a state update. If so, resolve. + if (msg.content.data.method === 'update') { + resolve({ + comm: comm, + msg: msg + }); + } + }); }); - }); - comm.send( - { - method: 'request_state' - }, - that.callbacks() - ); - return update_promise; - }, + comm.send( + { + method: 'request_state' + }, + that.callbacks() + ); + return update_promise; + }, + 4 /* calls*/, + 5 /* ms */ + ), { concurrency: 100 } ); }) diff --git a/yarn.lock b/yarn.lock index 5f8424ac5d..f1042ed2e1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7542,6 +7542,11 @@ p-reduce@^1.0.0: resolved "https://registry.npmjs.org/p-reduce/-/p-reduce-1.0.0.tgz#18c2b0dd936a4690a529f8231f58a0fdb6a47dfa" integrity sha1-GMKw3ZNqRpClKfgjH1ig/bakffo= +p-throttle@^3.1.0: + version "3.1.0" + resolved "https://registry.npmjs.org/p-throttle/-/p-throttle-3.1.0.tgz#dee34ce4e77d7cc2dfdc1fea0daedccc64147214" + integrity sha512-rLo81NXBihs3GJQhq89IXa0Egj/sbW1zW8/qnyadOwUhIUrZSUvyGdQ46ISRKELFBkVvmMJ4JUqWki4oAh30Qw== + p-try@^1.0.0: version "1.0.0" resolved "https://registry.npmjs.org/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3" From cc23b0898bbd16dbe7516149d19c7fb3c1ffb1ac Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 11:03:09 -0800 Subject: [PATCH 09/10] Fix import error --- packages/jupyterlab-manager/src/manager.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/jupyterlab-manager/src/manager.ts b/packages/jupyterlab-manager/src/manager.ts index b51cde085a..b450c1ba7e 100644 --- a/packages/jupyterlab-manager/src/manager.ts +++ b/packages/jupyterlab-manager/src/manager.ts @@ -19,8 +19,7 @@ import { import { ManagerBase, serialize_state, - IStateOptions, - chunkMap + IStateOptions } from '@jupyter-widgets/base-manager'; import { IDisposable } from '@lumino/disposable'; From 2b3bd0902e6b438b9362c8db8f40afe307798df9 Mon Sep 17 00:00:00 2001 From: Jason Grout Date: Thu, 30 Jan 2020 11:04:27 -0800 Subject: [PATCH 10/10] Fix integrity issue --- widgetsnbextension/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/widgetsnbextension/package.json b/widgetsnbextension/package.json index 6c39a1ce80..4c866ed079 100644 --- a/widgetsnbextension/package.json +++ b/widgetsnbextension/package.json @@ -30,7 +30,8 @@ "@lumino/messaging": "^1.2.1", "@lumino/widgets": "^1.3.0", "backbone": "1.2.3", - "p-map": "^3.0.0" + "p-map": "^3.0.0", + "p-throttle": "^3.1.0" }, "devDependencies": { "css-loader": "^3.4.0",