Skip to content

Commit

Permalink
Integrate new message-rpc prototype into core messaging API (extensions)
Browse files Browse the repository at this point in the history
Refactors and improves the prototype of a faster JSON-RPC protocol initially contributed by @tsmaeder (See also #10781).
The encoding approach used in the initial POC has some performance drawbacks when encoding plain JSON objects. We refactored the protocol to improve the performance for JSON objects whilst maintaining the excellent performance for encoding objects that contain binary data. 

Integrates the new message-rpc prototype into the core messaging API (replacing vscode-ws-jsonrpc).
This has major impacts on the Messaging API as we no longer expose a  `Connection` object (which was provided by vscode-ws-jsonrpc) and directly rely on a generic transport `Channel` implementation instead. 

- Introduce `Channel` as the main transport concept for messages (instead of the dedicated `Connection` from vscode-jsonrpc)
- Replace usage of  `vscode-ws-jsonrpc` with a custom binary RPC protocol.
- Refactor all connection providers to use the new binary protocol.
- Ensure that the `RemoteFileSystemProvider` API uses  `Uint8Arrays` over plain number arrays. This enables direct serialization as buffers and reduces the overhead  of unnecessarily converting from and to `Uint8Arrays`.
- Refactor terminal widget and terminal backend contribution so that the widgets communicates with the underlying terminal process using the new rpc protocol.
- Rework the IPC bootstrap protocol so that it uses a binary pipe for message transmission instead of the `ipc` pipe which only supports string encoding.
- Extend the `JsonRpcProxyFactory` with an optional `RpcConnectionFactory` that enables adopter to creates proxies with a that use a custom `RpcProtocol`/`RpcConnection`.

The plugin API still uses its own RPC protocol implementation. Currently we have to encode/decode between binary data to handle RPC calls from a plugin context. Aligning the two protocols and zero-copy tunneling of RPC messages is planned for a follow-up PR.

Contributed on behalf of STMicroelectronics.
Closes #10684
  • Loading branch information
tortmayr committed Apr 12, 2022
1 parent 7c928aa commit e3e2ff9
Show file tree
Hide file tree
Showing 49 changed files with 1,691 additions and 1,759 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"**/@types/node": "12"
},
"devDependencies": {
"@types/chai": "4.3.0",
"@types/chai-spies": "1.0.3",
"@types/chai-string": "^1.4.0",
"@types/jsdom": "^11.0.4",
"@types/node": "12",
Expand All @@ -20,6 +22,8 @@
"@typescript-eslint/eslint-plugin": "^4.8.1",
"@typescript-eslint/eslint-plugin-tslint": "^4.8.1",
"@typescript-eslint/parser": "^4.8.1",
"chai": "4.3.4",
"chai-spies": "1.0.0",
"chai-string": "^1.4.0",
"chalk": "4.0.0",
"concurrently": "^3.5.0",
Expand Down
60 changes: 32 additions & 28 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { decorate, injectable, interfaces, unmanaged } from 'inversify';
import { io, Socket } from 'socket.io-client';
import { Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common';
import { Channel } from '../../common/message-rpc/channel';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -35,6 +36,8 @@ export interface WebSocketOptions {
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

protected readonly onSocketDidOpenEmitter: Emitter<void> = new Emitter();
// Socket that is used by the main channel
protected socket: Socket;
get onSocketDidOpen(): Event<void> {
return this.onSocketDidOpenEmitter.event;
}
Expand All @@ -48,31 +51,23 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

protected readonly socket: Socket;

constructor() {
super();
protected createMainChannel(): Channel {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
const channel = new WebSocketChannel(toIWebSocket(socket));
socket.on('connect', () => {
this.fireSocketDidOpen();
});
socket.on('disconnect', reason => {
for (const channel of [...this.channels.values()]) {
channel.close(undefined, reason);
}
this.fireSocketDidClose();
});
socket.on('message', data => {
this.handleIncomingRawMessage(data);
});
channel.onClose(() => this.fireSocketDidClose());
socket.connect();
this.socket = socket;

return channel;
}

override openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise<void> {
if (this.socket.connected) {
super.openChannel(path, handler, options);
return super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.off('connect', openChannel);
Expand All @@ -82,14 +77,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => {
if (this.socket.connected) {
this.socket.send(content);
}
});
}

/**
* @param path The handler to reach in the backend.
*/
Expand Down Expand Up @@ -143,3 +130,20 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.onSocketDidCloseEmitter.fire(undefined);
}
}

function toIWebSocket(socket: Socket): IWebSocket {
return {
close: () => {
socket.removeAllListeners('disconnect');
socket.removeAllListeners('error');
socket.removeAllListeners('message');
socket.close();
},
isConnected: () => socket.connected,
onClose: cb => socket.on('disconnect', reason => cb(reason)),
onError: cb => socket.on('error', reason => cb(reason)),
onMessage: cb => socket.on('message', data => cb(data)),
send: message => socket.emit('message', message)
};
}

1 change: 1 addition & 0 deletions packages/core/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export * from './contribution-provider';
export * from './path';
export * from './logger';
export * from './messaging';
export * from './message-rpc';
export * from './message-service';
export * from './message-service-protocol';
export * from './progress-service';
Expand Down
10 changes: 0 additions & 10 deletions packages/core/src/common/message-rpc/README.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer';
import { ArrayBufferReadBuffer, ArrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrrayBufferWriteBuffer(buffer);
const writer = new ArrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
writer.writeUint8(8);
writer.writeUint32(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
Expand All @@ -32,8 +32,8 @@ describe('array message buffer tests', () => {

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readByte()).equal(8);
expect(reader.readInt()).equal(10000);
expect(reader.readUint8()).equal(8);
expect(reader.readUint32()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
Expand Down
120 changes: 92 additions & 28 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
// *****************************************************************************
// Copyright (C) 2021 Red Hat, Inc. and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrrayBufferWriteBuffer implements WriteBuffer {
export class ArrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

Expand All @@ -37,19 +37,42 @@ export class ArrrayBufferWriteBuffer implements WriteBuffer {
return this;
}

writeByte(value: number): WriteBuffer {
writeUint8(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeInt(value: number): WriteBuffer {
writeUint16(value: number): WriteBuffer {
this.ensureCapacity(2);
this.msg.setUint16(this.offset, value);
this.offset += 2;
return this;
}

writeUint32(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeInteger(value: number): WriteBuffer {
const type = getUintType(value);
this.writeUint8(type);
switch (type) {
case UintType.Uint8:
this.writeUint8(value);
return this;
case UintType.Uint16:
this.writeUint16(value);
return this;
default:
this.writeUint32(value);
return this;
}
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
Expand All @@ -61,8 +84,8 @@ export class ArrrayBufferWriteBuffer implements WriteBuffer {
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.ensureCapacity(value.byteLength + 4);
this.writeInt(value.byteLength);
this.writeInteger(value.byteLength);
this.ensureCapacity(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
Expand All @@ -79,32 +102,51 @@ export class ArrrayBufferWriteBuffer implements WriteBuffer {

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);

}
}

export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer) {
constructor(private readonly buffer: ArrayBuffer, readPosition = 0) {
this.offset = readPosition;
}

private get msg(): DataView {
return new DataView(this.buffer);
}

readByte(): number {
readUint8(): number {
return this.msg.getUint8(this.offset++);
}

readInt(): number {
readUint16(): number {
const result = this.msg.getUint16(this.offset);
this.offset += 2;
return result;
}

readUint32(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
}

readInteger(): number {
const type = this.readUint8();
switch (type) {
case UintType.Uint8:
return this.readUint8();
case UintType.Uint16:
return this.readUint16();
default:
return this.readUint32();
}
}

readString(): string {
const len = this.msg.getUint32(this.offset);
this.offset += 4;
const len = this.readInteger();
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
Expand All @@ -115,10 +157,32 @@ export class ArrayBufferReadBuffer implements ReadBuffer {
}

readBytes(): ArrayBuffer {
const length = this.msg.getUint32(this.offset);
this.offset += 4;
const length = this.readInteger();
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}

sliceAtCurrentPosition(): ReadBuffer {
return new ArrayBufferReadBuffer(this.buffer, this.offset);
}
}

/**
* Retrieve an {@link ArrayBuffer} view for the given buffer. Some {@link Uint8Array} buffer implementations e.g node's {@link Buffer}
* are using shared memory array buffers under the hood. Therefore we need to check the buffers `byteOffset` and `length` and slice
* the underlying array buffer if needed.
* @param buffer The Uint8Array or ArrayBuffer that should be converted.
* @returns a trimmed `ArrayBuffer` representation for the given buffer.
*/
export function toArrayBuffer(buffer: Uint8Array | ArrayBuffer): ArrayBuffer {
if (buffer instanceof ArrayBuffer) {
return buffer;
}
if (buffer.byteOffset === 0 && buffer.byteLength === buffer.buffer.byteLength) {
return buffer.buffer;
}

return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
}

Loading

0 comments on commit e3e2ff9

Please sign in to comment.