Skip to content

Commit

Permalink
[WIP] Integrate new message-rpc protocol into plugin API
Browse files Browse the repository at this point in the history
- Refactor plugin-ext rpc protocol to reuse the new binary message-rpc protocol.
- - Remove custom RPC message encoding and handling reuse message-rpc
-- Implement `QueuingChannelMultiplexer`that queues messages and sends them accumlated on the next process.tick (replaces the old `Multiplexer` implementation
-- Refactors proxy handlers and remote target handlers
-- Use `Channel` instead of `MessageConnection` for creating new instances of `RPCProtocol`
-- Implement special message encoders and decoders for the plugin communication

- Adapt the `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings.
- Implement a message protocol for process pipe streams to be able to send messages in binary format between the hosted-plugin-process and the plugin-host.

Contributed on behalf of STMicroelectronics
Part of eclipse-theia#10684
  • Loading branch information
tortmayr committed Apr 29, 2022
1 parent fccfd5b commit ce98274
Show file tree
Hide file tree
Showing 21 changed files with 603 additions and 429 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
import { Disposable } from '../disposable';
import { Emitter, Event } from '../event';
import { getUintType, UintType, ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrayBufferWriteBuffer implements WriteBuffer {
export class ArrayBufferWriteBuffer implements WriteBuffer, Disposable {

constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

protected isDisposed = false;

ensureCapacity(value: number): WriteBuffer {
let newLength = this.buffer.byteLength;
while (newLength < this.offset + value) {
Expand Down Expand Up @@ -97,12 +101,22 @@ export class ArrayBufferWriteBuffer implements WriteBuffer {
}

commit(): void {
if (this.isDisposed) {
throw new Error('Error during commit. The Writebuffer is already disposed')
}
this.onCommitEmitter.fire(this.getCurrentContents());
this.dispose();
}

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);
}

dispose(): void {
if (!this.isDisposed) {
this.onCommitEmitter.dispose();
this.isDisposed = true;
}
}
}

Expand Down
18 changes: 13 additions & 5 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ export class ChannelMultiplexer {
this.openChannels.clear();
}

protected getUnderlyingWriteBuffer(): WriteBuffer {
return this.underlyingChannel.getWriteBuffer();
}

protected handleMessage(buffer: ReadBuffer): void {
const type = buffer.readUint8();
const id = buffer.readString();
Expand All @@ -171,7 +175,7 @@ export class ChannelMultiplexer {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -185,7 +189,7 @@ export class ChannelMultiplexer {
// edge case: both side try to open a channel at the same time.
resolve(channel);
}
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -212,14 +216,14 @@ export class ChannelMultiplexer {
// Prepare the write buffer for the channel with the give, id. The channel id has to be encoded
// and written to the buffer before the actual message.
protected prepareWriteBuffer(id: string): WriteBuffer {
const underlying = this.underlyingChannel.getWriteBuffer();
const underlying = this.getUnderlyingWriteBuffer();
underlying.writeUint8(MessageTypes.Data);
underlying.writeString(id);
return underlying;
}

protected closeChannel(id: string): void {
this.underlyingChannel.getWriteBuffer()
this.getUnderlyingWriteBuffer()
.writeUint8(MessageTypes.Close)
.writeString(id)
.commit();
Expand All @@ -228,10 +232,14 @@ export class ChannelMultiplexer {
}

open(id: string): Promise<Channel> {
const existingChannel = this.getOpenChannel(id);
if (existingChannel) {
return Promise.resolve(existingChannel);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
return result;
}

Expand Down
48 changes: 29 additions & 19 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,12 @@ export function transformErrorForSerialization(error: Error): SerializedError {

export enum ObjectType {
JSON = 1,
ArrayBuffer = 2,
ByteArray = 3,
ARRAY_BUFFER = 2,
BYTE_ARRAY = 3,
UNDEFINED = 4,
ObjectArray = 5,
OBJECT_ARRAY = 5,
RESPONSE_ERROR = 6,
ERROR = 7

}

/**
Expand Down Expand Up @@ -158,6 +157,10 @@ export class RpcMessageDecoder {
protected tagIntType: UintType;

constructor() {
this.registerDecoders();
}

protected registerDecoders(): void {
this.registerDecoder(ObjectType.JSON, {
read: buf => JSON.parse(buf.readString())
});
Expand All @@ -182,15 +185,15 @@ export class RpcMessageDecoder {
}
});

this.registerDecoder(ObjectType.ByteArray, {
this.registerDecoder(ObjectType.BYTE_ARRAY, {
read: buf => new Uint8Array(buf.readBytes())
});

this.registerDecoder(ObjectType.ArrayBuffer, {
this.registerDecoder(ObjectType.ARRAY_BUFFER, {
read: buf => buf.readBytes()
});

this.registerDecoder(ObjectType.ObjectArray, {
this.registerDecoder(ObjectType.OBJECT_ARRAY, {
read: buf => {
const encodedSeparately = buf.readUint8() === 1;

Expand All @@ -213,9 +216,10 @@ export class RpcMessageDecoder {
* by retrieving the highest tag value and calculating the required Uint size to store it.
* @param tag the tag for which the decoder should be registered.
* @param decoder the decoder that should be registered.
* @param overwrite flag to indicate wether an existing registration with the same tag should be overwritten with the new registration.
*/
registerDecoder(tag: number, decoder: ValueDecoder): void {
if (this.decoders.has(tag)) {
registerDecoder(tag: number, decoder: ValueDecoder, overwrite = false): void {
if (!overwrite && this.decoders.has(tag)) {
throw new Error(`Decoder already registered: ${tag}`);
}
this.decoders.set(tag, decoder);
Expand Down Expand Up @@ -356,7 +360,7 @@ export class RpcMessageEncoder {
write: (buf, value) => buf.writeString(JSON.stringify(value))
});

this.registerEncoder(ObjectType.ByteArray, {
this.registerEncoder(ObjectType.BYTE_ARRAY, {
is: value => value instanceof Uint8Array,
write: (buf, value: Uint8Array) => {
/* When running in a nodejs context the received Uint8Array might be
Expand All @@ -367,22 +371,22 @@ export class RpcMessageEncoder {
}
});

this.registerEncoder(ObjectType.ArrayBuffer, {
this.registerEncoder(ObjectType.ARRAY_BUFFER, {
is: value => value instanceof ArrayBuffer,
write: (buf, value: ArrayBuffer) => buf.writeBytes(value)
});

this.registerEncoder(ObjectType.ObjectArray, {
this.registerEncoder(ObjectType.OBJECT_ARRAY, {
is: value => Array.isArray(value),
write: (buf, args: any[]) => {
const encodeSeparately = this.requiresSeparateEncoding(args);
buf.writeUint8(encodeSeparately ? 1 : 0);
if (!encodeSeparately) {
this.writeTypedValue(buf, args, ObjectType.ObjectArray);
this.writeTypedValue(buf, args, ObjectType.OBJECT_ARRAY);
} else {
buf.writeInteger(args.length);
for (let i = 0; i < args.length; i++) {
this.writeTypedValue(buf, args[i], ObjectType.ObjectArray);
this.writeTypedValue(buf, args[i], ObjectType.OBJECT_ARRAY);
}
}
}
Expand All @@ -394,14 +398,20 @@ export class RpcMessageEncoder {
* After the successful registration the {@link tagIntType} is recomputed
* by retrieving the highest tag value and calculating the required Uint size to store it.
* @param tag the tag for which the encoder should be registered.
* @param decoder the encoder that should be registered.
* @param encoder the encoder that should be registered.
* @param overwrite to indicate wether an existing registration with the same tag should be overwritten with the new registration.
*/
registerEncoder<T>(tag: number, encoder: ValueEncoder): void {
if (this.registeredTags.has(tag)) {
registerEncoder<T>(tag: number, encoder: ValueEncoder, overwrite = false): void {
if (!overwrite && this.registeredTags.has(tag)) {
throw new Error(`Tag already registered: ${tag}`);
}
this.registeredTags.add(tag);
this.encoders.push([tag, encoder]);
if (!overwrite) {
this.registeredTags.add(tag);
this.encoders.push([tag, encoder]);
} else {
const overrideIndex = this.encoders.findIndex(existingEncoder => existingEncoder[0] === tag);
this.encoders[overrideIndex] = [tag, encoder];
}
const maxTagId = this.encoders.map(value => value[0]).sort().reverse()[0];
this.tagIntType = getUintType(maxTagId);
}
Expand Down
12 changes: 10 additions & 2 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Channel, MessageProvider } from './channel';
import { ReadBuffer } from './message-buffer';
import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder';
import { CancellationToken, CancellationTokenSource } from '../../../shared/vscode-languageserver-protocol';
import { Disposable } from '../disposable';

/**
* Handles request messages received by the {@link RpcServer}.
Expand Down Expand Up @@ -104,11 +105,18 @@ export class RpcServer {
const result = await this.requestHandler(method, args);
this.cancellationTokenSources.delete(id);
this.encoder.replyOK(output, id, result);
output.commit();
} catch (err) {
// In case of an error the output write buffer might already contain a partially written message.
// So we dispose the output buffer and create a new clean write buffer
if (Disposable.is(output)) {
output.dispose();
}
const errorOutput = this.channel.getWriteBuffer();
this.cancellationTokenSources.delete(id);
this.encoder.replyErr(output, id, err);
this.encoder.replyErr(errorOutput, id, err);
errorOutput.commit();
}
output.commit();
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
Expand Down
8 changes: 4 additions & 4 deletions packages/plugin-ext/src/common/plugin-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ export function buildFrontendModuleName(plugin: PluginPackage | PluginModel): st

export const HostedPluginClient = Symbol('HostedPluginClient');
export interface HostedPluginClient {
postMessage(pluginHost: string, message: string): Promise<void>;
postMessage(pluginHost: string, buffer: ArrayBuffer): Promise<void>;

log(logPart: LogPart): void;

Expand Down Expand Up @@ -857,7 +857,7 @@ export interface HostedPluginServer extends JsonRpcServer<HostedPluginClient> {

getExtPluginAPI(): Promise<ExtPluginApi[]>;

onMessage(targetHost: string, message: string): Promise<void>;
onMessage(targetHost: string, message: ArrayBuffer): Promise<void>;

}

Expand Down Expand Up @@ -894,9 +894,9 @@ export interface PluginServer {
export const ServerPluginRunner = Symbol('ServerPluginRunner');
export interface ServerPluginRunner {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
acceptMessage(pluginHostId: string, jsonMessage: string): boolean;
acceptMessage(pluginHostId: string, jsonMessage: ArrayBuffer): boolean;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onMessage(pluginHostId: string, jsonMessage: string): void;
onMessage(pluginHostId: string, jsonMessage: ArrayBuffer): void;
setClient(client: HostedPluginClient): void;
setDefault(defaultRunner: ServerPluginRunner): void;
clientClosed(): void;
Expand Down
107 changes: 107 additions & 0 deletions packages/plugin-ext/src/common/proxy-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Channel } from '@theia/core/';
import { RpcClient, RpcServer } from '@theia/core';
import { Deferred } from '@theia/core/lib/common/promise-util';
import { RpcMessageDecoder, RpcMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';

export interface RpcMessageParser {
encoder: RpcMessageEncoder,
decoder: RpcMessageDecoder
}
/**
* A proxy handler that will send any method invocation on the proxied object
* as a rcp protocol message over a channel.
*/
export class ClientProxyHandler<T extends object> implements ProxyHandler<T> {
private channelDeferred: Deferred<RpcClient> = new Deferred();

constructor(protected readonly id: string, protected readonly parser: RpcMessageParser) {
}

listen(channel: Channel): void {
const client = new RpcClient(channel, this.parser);
this.channelDeferred.resolve(client);
}

get(target: any, name: string, receiver: any): any {
if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) {
return target[name];
}
const isNotify = this.isNotification(name);
return (...args: any[]) => {
const method = name.toString();
return this.channelDeferred.promise.then((connection: RpcClient) =>
new Promise((resolve, reject) => {
try {
if (isNotify) {
connection.sendNotification(method, args);
resolve(undefined);
} else {
const resultPromise = connection.sendRequest(method, args) as Promise<any>;
resultPromise.then((result: any) => {
resolve(result);
}).catch(e => {
reject(e);
});
}
} catch (err) {
reject(err);
}
})
);
};
}

/**
* Return whether the given property represents a notification. If true,
* the promise returned from the invocation will resolve immediately to `undefined`
*
* A property leads to a notification rather than a method call if its name
* begins with `notify` or `on`.
*
* @param p - The property being called on the proxy.
* @return Whether `p` represents a notification.
*/
protected isNotification(p: PropertyKey): boolean {
let propertyString = p.toString();
if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) {
propertyString = propertyString.substring(1);
}
return propertyString.startsWith('notify') || propertyString.startsWith('on');
}
}

export class RpcInvocationHandler {

constructor(readonly id: string, readonly target: any, protected readonly parser: RpcMessageParser) {
}

listen(channel: Channel): void {
const server = new RpcServer(channel, (method: string, args: any[]) => this.handleRequest(method, args), this.parser);
server.onNotification((e: { method: string, args: any }) => this.onNotification(e.method, e.args));
}

protected async handleRequest(method: string, args: any[]): Promise<any> {
return this.target[method](...args);
}

protected onNotification(method: string, args: any[]): void {
this.target[method](...args);
}
}

Loading

0 comments on commit ce98274

Please sign in to comment.