Skip to content

Commit

Permalink
eclipse-theiaGH-11159 Use msgpackR for default encoding of rpc messages
Browse files Browse the repository at this point in the history
- Refactor RpcMessageEncoder/RpcMessageDecoder`
   - Extract generic  interfaces
   - Provide default implementation based on msgpackR
   - Update message encoder test cases
- Introduce reusable `AbstractChannel` implementation to reduce boilerplate code for channel setup

Fixes eclipse-theia#11159

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed Aug 2, 2022
1 parent 0c0f8c7 commit 00064f1
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 537 deletions.
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"lodash.debounce": "^4.0.8",
"lodash.throttle": "^4.1.1",
"markdown-it": "^12.3.2",
"msgpackr": "^1.6.1",
"nsfw": "^2.1.2",
"p-debounce": "^2.1.0",
"perfect-scrollbar": "^1.3.0",
Expand Down
47 changes: 37 additions & 10 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable } from 'inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
Expand Down Expand Up @@ -68,15 +69,11 @@ export interface ChannelCloseEvent {
export type MessageProvider = () => ReadBuffer;

/**
* Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
* the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
export class ForwardingChannel implements Channel {

protected toDispose = new DisposableCollection();
constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}
@injectable()
export abstract class AbstractChannel implements Channel {

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
get onClose(): Event<ChannelCloseEvent> {
Expand All @@ -93,13 +90,43 @@ export class ForwardingChannel implements Channel {
return this.onMessageEmitter.event;
};

protected toDispose: DisposableCollection = new DisposableCollection();

constructor() {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

close(): void {
this.toDispose.dispose();
}

abstract getWriteBuffer(): WriteBuffer;

}

/**
* Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
* the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
*/
export class ForwardingChannel extends AbstractChannel {

constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
super();
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

// Override event listener as public readonly so that the can be accessed from an outer scope (i.e. the channel multiplexer).
override readonly onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
override readonly onErrorEmitter: Emitter<unknown> = new Emitter();
override readonly onMessageEmitter: Emitter<MessageProvider> = new Emitter();

getWriteBuffer(): WriteBuffer {
return this.writeBufferSource();
}

close(): void {
override close(): void {
super.close();
this.closeHandler();
this.toDispose.dispose();
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/common/message-rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
44 changes: 28 additions & 16 deletions packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,53 @@
// *****************************************************************************

import { expect } from 'chai';
import {
EncodingError, MsgPackMessageDecoder, MsgPackMessageEncoder
} from './rpc-message-encoder';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { EncodingError, RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder';

describe('PPC Message Codex', () => {
describe('RPC Message Encoder & Decoder', () => {
describe('PPC Message Encoder & Decoder', () => {
describe('Msgpack Encoder & Decoder', () => {
it('should encode object into binary message and decode the message back into the original object', () => {
const buffer = new Uint8Array(1024);
const writer = new Uint8ArrayWriteBuffer(buffer);
const testObject = {
string: 'string',
boolean: true,
integer: 5,
float: 14.5,
array: ['1', 2, { three: 'three' }],
set: new Set([1, 2, 3]),
map: new Map([[1, 1], [2, 2], [3, 3]]),
buffer: new TextEncoder().encode('ThisIsAUint8Array'),
object: { foo: 'bar', baz: true },
undefined: undefined,
// eslint-disable-next-line no-null/no-null
null: null
};

const encoder = new RpcMessageEncoder();
const jsonMangled = JSON.parse(JSON.stringify(encoder));
// The RpcMessageEncoder can decode/encode collections, whereas JSON.parse can't. => We have to manually restore the set
// eslint-disable-next-line @typescript-eslint/no-explicit-any
jsonMangled.registeredTags = (encoder as any).registeredTags;

encoder.writeTypedValue(writer, encoder, new WeakSet());

const encoder = new MsgPackMessageEncoder();
encoder.encode(writer, testObject);
const written = writer.getCurrentContents();

const reader = new Uint8ArrayReadBuffer(written);

const decoder = new RpcMessageDecoder();
const decoded = decoder.readTypedValue(reader);
const decoder = new MsgPackMessageDecoder();
const decoded = decoder.decode(reader);

expect(decoded).deep.equal(jsonMangled);
expect(decoded).deep.equal(testObject);
});
it('should fail with an EncodingError when trying to encode the object ', () => {
const x = new Set();
const y = new Set();
x.add(y);
y.add(x);
const encoder = new RpcMessageEncoder();

const writer = new Uint8ArrayWriteBuffer();
expect(() => encoder.writeTypedValue(writer, x, new WeakSet())).to.throw(EncodingError);
const encoder = new MsgPackMessageEncoder();
expect(() => encoder.encode(writer, x)).to.throw(EncodingError);
});
});
});


Loading

0 comments on commit 00064f1

Please sign in to comment.