Skip to content

Commit

Permalink
Use binary message RPC protocol for plugin API
Browse files Browse the repository at this point in the history
Refactors the plugin RPC protocol to make use of the new message-rpc introduced with eclipse-theia#11011/eclipse-theia#11228.
- Refactor plugin-ext RpcProtocol API to reuse the new message-rpc protocol
  - Remove custom RPC message encoding and handling reuse message-rpc
  - Implement `QueuingChannelMultiplexer` that queues messages and sends them accumulated on the next process.tick (replaces the old Multiplexer 
    implementation)
   - Refactors proxy handlers and remote target handlers
   - Use `Channel` instead of `MessageConnection` for creating new instances of RPCProtocol
   - Refactor `RpcMessageEncoder`/`RpcMessageDecoder` to enable overwritting of already registered value encoders/decoders. 
   - Add mode property to  base `RpcProtocol` to enable switching from a bidirectional RPC protocol to a client-only or server-only variant.
- Implement special message encoders and decoders for the plugin communication. (Replacement for the old `ObjectTransferrer` JSON replacers/revivers)
- Adapt `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings. This enables direct writethrough of the binary messages received from the hosted plugin process.
- Adapt `hosted-plugin-process` and `plugin-host` to directly send binary messages via  `IpcChannel`/`BinaryMessagePipe`

- Remove incorrect (and unused) notification proxy identifiers and instantiation
  - NotificationExt was instantiated in the main context
  - There were unused notification proxy identifiers for main and ext in the wrong contexts

Part of eclipse-theia#10684
Fixes eclipse-theia#9514

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed Nov 1, 2022
1 parent 000988a commit cffeda8
Show file tree
Hide file tree
Showing 21 changed files with 586 additions and 561 deletions.
20 changes: 14 additions & 6 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ export class ChannelMultiplexer implements Disposable {

}

protected getUnderlyingWriteBuffer(): WriteBuffer {
return this.underlyingChannel.getWriteBuffer();
}

protected handleMessage(buffer: ReadBuffer): void {
const type = buffer.readUint8();
const id = buffer.readString();
Expand All @@ -206,7 +210,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -220,7 +224,7 @@ export class ChannelMultiplexer implements Disposable {
// edge case: both side try to open a channel at the same time.
resolve(channel);
}
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -236,7 +240,7 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
}
}

Expand All @@ -247,14 +251,14 @@ export class ChannelMultiplexer implements Disposable {
// Prepare the write buffer for the channel with the give, id. The channel id has to be encoded
// and written to the buffer before the actual message.
protected prepareWriteBuffer(id: string): WriteBuffer {
const underlying = this.underlyingChannel.getWriteBuffer();
const underlying = this.getUnderlyingWriteBuffer();
underlying.writeUint8(MessageTypes.Data);
underlying.writeString(id);
return underlying;
}

protected closeChannel(id: string): void {
this.underlyingChannel.getWriteBuffer()
this.getUnderlyingWriteBuffer()
.writeUint8(MessageTypes.Close)
.writeString(id)
.commit();
Expand All @@ -263,10 +267,14 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
const existingChannel = this.getOpenChannel(id);
if (existingChannel) {
return Promise.resolve(existingChannel);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
return result;
}

Expand Down
87 changes: 87 additions & 0 deletions packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// *****************************************************************************
// Copyright (C) 2022 STMicroelectronics and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { addExtension } from 'msgpackr';
import { ResponseError } from './rpc-message-encoder';

/**
* Handles the global registration of custom MsgPackR extensions
* required for the default RPC communication. MsgPackR extensions
* are installed globally on both ends of the communication channel.
* (frontend-backend, pluginExt-pluginMain).
* Is implemented as singleton as it is also used in plugin child processes which have no access to inversify.
*/
export class MsgPackExtensionManager {
private static readonly INSTANCE = new MsgPackExtensionManager();
public static getInstance(): MsgPackExtensionManager {
return this.INSTANCE;
}

private extensions = new Map<number, MsgPackExtension>();

private constructor() {
}

registerExtensions(...extensions: MsgPackExtension[]): void {
extensions.forEach(extension => {
if (extension.tag < 1 || extension.tag > 100) {
// MsgPackR reserves the tag range 1-100 for custom extensions.
throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`);
}
if (this.extensions.has(extension.tag)) {
throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`);
}
this.extensions.set(extension.tag, extension);
addExtension({
Class: extension.class,
type: extension.tag,
write: extension.serialize,
read: extension.deserialize
});
});
}

getExtension(tag: number): MsgPackExtension | undefined {
return this.extensions.get(tag);
}
}

// Register custom msgPack extension for ResponseErrors.
MsgPackExtensionManager.getInstance().registerExtensions({
class: ResponseError,
tag: 1,
serialize: (instance: ResponseError) => {
const { code, data, message, name, stack } = instance;
return { code, data, message, name, stack };
},
deserialize: data => {
const error = new ResponseError(data.code, data.message, data.data);
error.name = data.name;
error.stack = data.stack;
return error;
}
});

export interface MsgPackExtension {
class: Function,
tag: number,
serialize(instance: unknown): unknown,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
deserialize(serialized: any): unknown
}

export type Constructor<T> = new (...params: unknown[]) => T;

26 changes: 3 additions & 23 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************
/* eslint-disable @typescript-eslint/no-explicit-any */

import { addExtension, Packr as MsgPack } from 'msgpackr';
import { Packr as MsgPack } from 'msgpackr';
import { ReadBuffer, WriteBuffer } from './message-buffer';

/**
Expand Down Expand Up @@ -121,27 +121,10 @@ export interface RpcMessageEncoder {
}

export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false });
// Add custom msgpackR extension for ResponseErrors.
addExtension({
Class: ResponseError,
type: 1,
write: (instance: ResponseError) => {
const { code, data, message, name, stack } = instance;
return { code, data, message, name, stack };
},
read: data => {
const error = new ResponseError(data.code, data.message, data.data);
error.name = data.name;
error.stack = data.stack;
return error;
}
});

export class MsgPackMessageEncoder implements RpcMessageEncoder {

constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {

}
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
Expand Down Expand Up @@ -169,13 +152,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
throw err;
}
}

}

export class MsgPackMessageDecoder implements RpcMessageDecoder {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

}
decode<T = any>(buf: ReadBuffer): T {
const bytes = buf.readBytes();
return this.msgPack.decode(bytes);
Expand All @@ -184,5 +165,4 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder {
parse(buffer: ReadBuffer): RpcMessage {
return this.decode(buffer);
}

}
90 changes: 57 additions & 33 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import { CancellationToken, CancellationTokenSource } from '../cancellation';
import { DisposableCollection } from '../disposable';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { Deferred } from '../promise-util';
import { Channel } from './channel';
import { MsgPackMessageDecoder, MsgPackMessageEncoder, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder';
import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';

/**
* Handles request messages received by the {@link RpcServer}.
* Handles request messages received by the {@link RPCProtocol}.
*/
export type RequestHandler = (method: string, args: any[]) => Promise<any>;

Expand All @@ -39,15 +38,20 @@ export interface RpcProtocolOptions {
/**
* The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used.
*/
decoder?: RpcMessageDecoder
decoder?: RpcMessageDecoder,
/**
* The runtime mode determines whether the RPC protocol is bi-directional (default) or acts as a client or server only.
*/
mode?: 'default' | 'clientOnly' | 'serverOnly'
}

/**
* Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
* sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
* Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send
* requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server).
* Clients can get a promise for a remote request result that will be either resolved or
* rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request
* Currently, there is no timeout handling for long running requests implemented.
* The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct an RPC protocol instance that acts only as client or server instead.
*/
export class RpcProtocol {
static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token';
Expand All @@ -58,6 +62,7 @@ export class RpcProtocol {

protected readonly encoder: RpcMessageEncoder;
protected readonly decoder: RpcMessageDecoder;
protected readonly mode: 'default' | 'clientOnly' | 'serverOnly';

protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter();
protected readonly cancellationTokenSources = new Map<number, CancellationTokenSource>();
Expand All @@ -68,37 +73,50 @@ export class RpcProtocol {

protected toDispose = new DisposableCollection();

constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler | undefined, options: RpcProtocolOptions = {}) {
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
channel.onClose(() => this.toDispose.dispose());
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
this.mode = options.mode ?? 'default';

if (this.mode !== 'clientOnly' && requestHandler === undefined) {
console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.');
}
}

handleMessage(message: RpcMessage): void {
switch (message.type) {
case RpcMessageType.Cancel: {
this.handleCancel(message.id);
break;
}
case RpcMessageType.Request: {
this.handleRequest(message.id, message.method, message.args);
break;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
break;
if (this.mode !== 'clientOnly') {
switch (message.type) {
case RpcMessageType.Cancel: {
this.handleCancel(message.id);
return;
}
case RpcMessageType.Request: {
this.handleRequest(message.id, message.method, message.args);
return;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
return;
}
}
case RpcMessageType.Reply: {
this.handleReply(message.id, message.res);
break;
}
case RpcMessageType.ReplyErr: {
this.handleReplyErr(message.id, message.err);
break;
}
if (this.mode !== 'serverOnly') {
switch (message.type) {
case RpcMessageType.Reply: {
this.handleReply(message.id, message.res);
return;
}
case RpcMessageType.ReplyErr: {
this.handleReplyErr(message.id, message.err);
return;
}
}
}
// If the message was not handled until here, it is incompatible with the mode.
console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`);
}

protected handleReply(id: number, value: any): void {
Expand Down Expand Up @@ -126,13 +144,13 @@ export class RpcProtocol {
}

sendRequest<T>(method: string, args: any[]): Promise<T> {
const id = this.nextMessageId++;
const reply = new Deferred<T>();

// The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the
// args array and the `CANCELLATION_TOKEN_KEY` string instead.
const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;

const id = this.nextMessageId++;
const reply = new Deferred<T>();

if (cancellationToken) {
args.push(RpcProtocol.CANCELLATION_TOKEN_KEY);
}
Expand All @@ -153,6 +171,13 @@ export class RpcProtocol {
}

sendNotification(method: string, args: any[]): void {
// If the notification supports a CancellationToken, it needs to be treated like a request
// because cancellation does not work with the simplified "fire and forget" approach of simple notifications.
if (args.length && CancellationToken.is(args[args.length - 1])) {
this.sendRequest(method, args);
return;
}

const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
output.commit();
Expand All @@ -167,7 +192,6 @@ export class RpcProtocol {
protected handleCancel(id: number): void {
const cancellationTokenSource = this.cancellationTokenSources.get(id);
if (cancellationTokenSource) {
this.cancellationTokenSources.delete(id);
cancellationTokenSource.cancel();
}
}
Expand All @@ -185,14 +209,14 @@ export class RpcProtocol {
}

try {
const result = await this.requestHandler(method, args);
const result = await this.requestHandler!(method, args);
this.cancellationTokenSources.delete(id);
this.encoder.replyOK(output, id, result);
output.commit();
} catch (err) {
// In case of an error the output buffer might already contains parts of an message.
// => Dispose the current buffer and retrieve a new, clean one for writing the response error.
if (output instanceof Uint8ArrayWriteBuffer) {
if (Disposable.is(output)) {
output.dispose();
}
const errorOutput = this.channel.getWriteBuffer();
Expand Down
Loading

0 comments on commit cffeda8

Please sign in to comment.