Skip to content

Commit

Permalink
eclipse-theiaGH-11159 Use msgpackR for default encoding of rpc messages
Browse files Browse the repository at this point in the history
- Refactor RpcMessageEncoder/RpcMessageDecoder`
   - Extract generic  interfaces
   - Provide default implementation based on msgpackR
   - Rename existing "old" implementation into `RecursiveMessageEncoder/Decoder
   - Update message encoder test cases
- Introduce reusable `AbstractChannel` implementation to reduce boilerplate code for channel setup

Fixes eclipse-theia#11159

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed Jul 20, 2022
1 parent b000f96 commit b51bae8
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 347 deletions.
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"lodash.debounce": "^4.0.8",
"lodash.throttle": "^4.1.1",
"markdown-it": "^12.3.2",
"msgpackr": "^1.6.1",
"nsfw": "^2.1.2",
"p-debounce": "^2.1.0",
"perfect-scrollbar": "^1.3.0",
Expand Down
47 changes: 37 additions & 10 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable } from 'inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
Expand Down Expand Up @@ -68,15 +69,11 @@ export interface ChannelCloseEvent {
export type MessageProvider = () => ReadBuffer;

/**
* Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
* the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
export class ForwardingChannel implements Channel {

protected toDispose = new DisposableCollection();
constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}
@injectable()
export abstract class AbstractChannel implements Channel {

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
get onClose(): Event<ChannelCloseEvent> {
Expand All @@ -93,13 +90,43 @@ export class ForwardingChannel implements Channel {
return this.onMessageEmitter.event;
};

protected toDispose: DisposableCollection = new DisposableCollection();

constructor() {
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

close(): void {
this.toDispose.dispose();
}

abstract getWriteBuffer(): WriteBuffer;

}

/**
* Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to
* the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}.
*/
export class ForwardingChannel extends AbstractChannel {

constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) {
super();
this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]);
}

// Override event listener as public readonly so that the can be accessed from an outer scope (i.e. the channel multiplexer).
override readonly onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
override readonly onErrorEmitter: Emitter<unknown> = new Emitter();
override readonly onMessageEmitter: Emitter<MessageProvider> = new Emitter();

getWriteBuffer(): WriteBuffer {
return this.writeBufferSource();
}

close(): void {
override close(): void {
super.close();
this.closeHandler();
this.toDispose.dispose();
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/common/message-rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
75 changes: 46 additions & 29 deletions packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,58 @@
// *****************************************************************************

import { expect } from 'chai';
import { ReadBuffer, WriteBuffer } from './message-buffer';
import {
EncodingError, MsgPackMessageDecoder, MsgPackMessageEncoder, RecursiveMessageEncoder,
RecursiveRpcMessageDecoder
} from './rpc-message-encoder';
import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
import { EncodingError, RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder';

describe('PPC Message Codex', () => {
describe('RPC Message Encoder & Decoder', () => {
it('should encode object into binary message and decode the message back into the original object', () => {
const buffer = new Uint8Array(1024);
const writer = new Uint8ArrayWriteBuffer(buffer);

const encoder = new RpcMessageEncoder();
const jsonMangled = JSON.parse(JSON.stringify(encoder));
// The RpcMessageEncoder can decode/encode collections, whereas JSON.parse can't. => We have to manually restore the set
// eslint-disable-next-line @typescript-eslint/no-explicit-any
jsonMangled.registeredTags = (encoder as any).registeredTags;
describe('PPC Message Encoder & Decoder', () => {
describe('Msgpack Encoder & Decoder', () => {
baseEncodingTests((buf, value) => new RecursiveMessageEncoder().writeTypedValue(buf, value, new WeakSet()), buf => new RecursiveRpcMessageDecoder().readTypedValue(buf));
});
describe('Recursive Encoder & Decoder', () => {
baseEncodingTests((buf, value) => new MsgPackMessageEncoder().encode(buf, value), buf => new MsgPackMessageDecoder().decode(buf));
});
});

encoder.writeTypedValue(writer, encoder, new WeakSet());
// eslint-disable-next-line @typescript-eslint/no-explicit-any
function baseEncodingTests(encode: (buf: WriteBuffer, value: any) => void, decode: (buf: ReadBuffer) => any): void {
it('should encode object into binary message and decode the message back into the original object', () => {
const buffer = new Uint8Array(1024);
const writer = new Uint8ArrayWriteBuffer(buffer);
const testObject = {
string: 'string',
boolean: true,
integer: 5,
float: 14.5,
array: ['1', 2, { three: 'three' }],
set: new Set([1, 2, 3]),
map: new Map([[1, 1], [2, 2], [3, 3]]),
buffer: new TextEncoder().encode('ThisIsAUint8Array'),
object: { foo: 'bar', baz: true },
undefined: undefined,
// eslint-disable-next-line no-null/no-null
null: null
};

const written = writer.getCurrentContents();
encode(writer, testObject);
const written = writer.getCurrentContents();

const reader = new Uint8ArrayReadBuffer(written);
const reader = new Uint8ArrayReadBuffer(written);

const decoder = new RpcMessageDecoder();
const decoded = decoder.readTypedValue(reader);
const decoded = decode(reader);

expect(decoded).deep.equal(jsonMangled);
});
it('should fail with an EncodingError when trying to encode the object ', () => {
const x = new Set();
const y = new Set();
x.add(y);
y.add(x);
const encoder = new RpcMessageEncoder();
expect(decoded).deep.equal(testObject);
});
it('should fail with an EncodingError when trying to encode the object ', () => {
const x = new Set();
const y = new Set();
x.add(y);
y.add(x);

const writer = new Uint8ArrayWriteBuffer();
expect(() => encoder.writeTypedValue(writer, x, new WeakSet())).to.throw(EncodingError);
});
const writer = new Uint8ArrayWriteBuffer();
expect(() => encode(writer, x)).to.throw(EncodingError);
});
});
}
Loading

0 comments on commit b51bae8

Please sign in to comment.