Skip to content

Commit

Permalink
Integrate new message-rpc prototype into core messaging API (extensio…
Browse files Browse the repository at this point in the history
…ns) (eclipse-theia#11011)

Refactors and improves the prototype of a faster JSON-RPC protocol initially contributed by @tsmaeder (See also eclipse-theia#10781).
The encoding approach used in the initial POC has some performance drawbacks when encoding plain JSON objects. We refactored the protocol to improve the performance for JSON objects whilst maintaining the excellent performance for encoding objects that contain binary data.

Integrates the new message-rpc prototype into the core messaging API (replacing vscode-ws-jsonrpc).
This has major impacts on the Messaging API as we no longer expose a  `Connection` object (which was provided by vscode-ws-jsonrpc) and directly rely on a generic transport `Channel` implementation instead.

- Introduce `Channel` as the main transport concept for messages (instead of the dedicated `Connection` from vscode-jsonrpc)
- Replace usage of  `vscode-ws-jsonrpc` with a custom binary RPC protocol.
- Remove 'vscode-ws-jsonrpc' depdency from "@theia/core/shared".
- Refactor all connection providers to use the new binary protocol.
- Ensure that the `RemoteFileSystemProvider` API uses  `Uint8Arrays` over plain number arrays. This enables direct serialization as buffers and reduces the overhead  of unnecessarily converting from and to `Uint8Arrays`.
- Refactor terminal widget and terminal backend contribution so that the widgets communicates with the underlying terminal process using the new rpc protocol.
- Rework the IPC bootstrap protocol so that it uses a binary pipe for message transmission instead of the `ipc` pipe which only supports string encoding.
- Extend the `JsonRpcProxyFactory` with an optional `RpcConnectionFactory` that enables adopter to creates proxies with a that use a custom `RpcProtocol`/`RpcConnection`.

The plugin API still uses its own RPC protocol implementation. This will be addressed in a follow-up PR. (See eclipse-theia#11093)

Fix critical bugs

- eclipse-theiaGH-11196 Remove dev/debug logoutput from IPCChannel
- eclipse-theiaGH-11199 Refactor connection provider and channel multiplexer to properly handle a reconnecting backend
   -  Remove console log in websocket-channel if the socket is not connected. Otherwise we end up in an endless loop.
   -  Ensure that channels & RpcProtocol instances proplery dispose all resources if the backend disconnects.
   -  Ensure that all previously open channels and RpcProtocol instances are properly restored once the backend reconnects.

- eclipse-theia#11203 Ensure that debugging is handled gracefully (implicitly fixed with the fix for eclipse-theia#11199)

- Remove dependency to `reconnecting-websocket` which is no longer needed since the swap to socket.io

Fixes eclipse-theia#11196
Fixes eclipse-theia#11199

Contributed on behalf of STMicroelectronics.

Co-authored-by: Thomas Mäder <[email protected]>
  • Loading branch information
tortmayr and tsmaeder committed Jun 2, 2022
1 parent 53a272e commit 982dda7
Show file tree
Hide file tree
Showing 51 changed files with 2,281 additions and 642 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
- Added the following method to the interface `DebugService`: `fetchDynamicDebugConfiguration` as well as the property `onDidChangedDebugConfigurationProviders`.
- Removed method `DebugPrefixConfiguration#runDynamicConfiguration`
- [core] The interface `SelectComponentProps` was updated to rename a property from `value` to `defaultValue`
- [core] Refactored the core messaging API. Replaced `vscode-ws-jsonrpc` with a custom RPC protocol that is better suited for handling binary data and enables message tunneling.
This impacts all main concepts of the messaging API. The API no longer exposes a `Connection` object and uses a generic `Channel` implementation instead.
- Replaces usage of `vscode-json-rpc`'s `Connection` with the new generic `Channel`. Affects `AbstractConnectionProvider`, `MessagingService`, `IPCConnectionProvider`, `ElectronMessagingService`
- `MessagingService`: No longer offers the `listen` and `forward` method. Use `wsChannel` instead.
- `RemoteFileSystemServer`: Use `UInt8Array` instead of plain number arrays for all arguments and return type that store binary data
- `DebugAdapter`: Replaced the debug-service internal `Channel` implementation with the newly introduced generic `Channel`.
[#11228](https://github.com/eclipse-theia/theia/pull/11228) - Contributed on behalf of STMicroelectronics.

## v1.26.0 - 5/26/2022

Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
1 change: 0 additions & 1 deletion packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ export class SomeClass {
- `react-virtualized` (from [`react-virtualized@^9.20.0`](https://www.npmjs.com/package/react-virtualized))
- `vscode-languageserver-protocol` (from [`vscode-languageserver-protocol@~3.15.3`](https://www.npmjs.com/package/vscode-languageserver-protocol))
- `vscode-uri` (from [`vscode-uri@^2.1.1`](https://www.npmjs.com/package/vscode-uri))
- `vscode-ws-jsonrpc` (from [`vscode-ws-jsonrpc@^0.2.0`](https://www.npmjs.com/package/vscode-ws-jsonrpc))
- `dompurify` (from [`dompurify@^2.2.9`](https://www.npmjs.com/package/dompurify))
- `express` (from [`express@^4.16.3`](https://www.npmjs.com/package/express))
- `lodash.debounce` (from [`lodash.debounce@^4.0.8`](https://www.npmjs.com/package/lodash.debounce))
Expand Down
5 changes: 1 addition & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
"react-dom": "^16.8.0",
"react-tooltip": "^4.2.21",
"react-virtualized": "^9.20.0",
"reconnecting-websocket": "^4.2.0",
"reflect-metadata": "^0.1.10",
"route-parser": "^0.0.5",
"safer-buffer": "^2.1.2",
Expand All @@ -70,7 +69,6 @@
"uuid": "^8.3.2",
"vscode-languageserver-protocol": "~3.15.3",
"vscode-uri": "^2.1.1",
"vscode-ws-jsonrpc": "^0.2.0",
"ws": "^7.1.2",
"yargs": "^15.3.1"
},
Expand Down Expand Up @@ -113,8 +111,7 @@
"react-dom",
"react-virtualized",
"vscode-languageserver-protocol",
"vscode-uri",
"vscode-ws-jsonrpc"
"vscode-uri"
],
"export =": [
"dompurify as DOMPurify",
Expand Down
1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.d.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/core/shared/vscode-ws-jsonrpc/index.js

This file was deleted.

59 changes: 34 additions & 25 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { io, Socket } from 'socket.io-client';
import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand Down Expand Up @@ -53,26 +53,42 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
constructor() {
super();
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
this.socket = this.createWebSocket(url);
this.socket.on('connect', () => {
this.initializeMultiplexer();
if (this.reconnectChannelOpeners.length > 0) {
this.reconnectChannelOpeners.forEach(opener => opener());
this.reconnectChannelOpeners = [];
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
this.socket.on('disconnect', () => this.fireSocketDidClose());
this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined));
this.fireSocketDidOpen();
});
socket.connect();
this.socket = socket;
this.socket.connect();
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
protected createMainChannel(): Channel {
return new WebSocketChannel(this.toIWebSocket(this.socket));
}

protected toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => {
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
onError: cb => socket.on('error', reason => cb(reason)),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +98,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* @param path The handler to reach in the backend.
*/
Expand Down Expand Up @@ -143,3 +151,4 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

2 changes: 1 addition & 1 deletion packages/core/src/browser/progress-status-bar-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************

import { injectable, inject } from 'inversify';
import { CancellationToken } from 'vscode-ws-jsonrpc';
import { CancellationToken } from '../../shared/vscode-languageserver-protocol';
import { ProgressClient, ProgressMessage, ProgressUpdate } from '../common';
import { StatusBar, StatusBarAlignment } from './status-bar';
import { Deferred } from '../common/promise-util';
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/common/cancellation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ export namespace CancellationToken {
isCancellationRequested: true,
onCancellationRequested: shortcutEvent
});

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function is(value: any): value is CancellationToken {
const candidate = value as CancellationToken;
return candidate && (candidate === CancellationToken.None
|| candidate === CancellationToken.Cancelled
|| (typeof candidate.isCancellationRequested === 'boolean' && !!candidate.onCancellationRequested));
}
}

export class CancellationError extends Error {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
88 changes: 88 additions & 0 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel';

use(spies);

/**
* A pipe with two channels at each end for testing.
*/
export class ChannelPipe {
readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => {
const leftWrite = new Uint8ArrayWriteBuffer();
leftWrite.onCommit(buffer => {
this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return leftWrite;
});
readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => {
const rightWrite = new Uint8ArrayWriteBuffer();
rightWrite.onCommit(buffer => {
this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer));
});
return rightWrite;
});
}
describe('Message Channel', () => {
describe('Channel multiplexer', () => {
it('should forward messages to intended target channel', async () => {
const pipe = new ChannelPipe();

const leftMultiplexer = new ChannelMultiplexer(pipe.left);
const rightMultiplexer = new ChannelMultiplexer(pipe.right);
const openChannelSpy = spy(() => {
});

rightMultiplexer.onDidOpenChannel(openChannelSpy);
leftMultiplexer.onDidOpenChannel(openChannelSpy);

const leftFirst = await leftMultiplexer.open('first');
const leftSecond = await leftMultiplexer.open('second');

const rightFirst = rightMultiplexer.getOpenChannel('first');
const rightSecond = rightMultiplexer.getOpenChannel('second');

assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: MessageProvider) => {
const message = buf().readString();
expect(message).equal('message for first');
});

rightFirst!.onMessage(rightFirstSpy);

leftFirst.getWriteBuffer().writeString('message for first').commit();
rightSecond!.getWriteBuffer().writeString('message for second').commit();

expect(leftSecondSpy).to.be.called();
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
});
});
});
Loading

0 comments on commit 982dda7

Please sign in to comment.