From b51bae80a1644d7b63203f532d196b93696618cb Mon Sep 17 00:00:00 2001 From: Tobias Ortmayr Date: Wed, 20 Jul 2022 14:54:35 +0200 Subject: [PATCH] GH-11159 Use msgpackR for default encoding of rpc messages - Refactor RpcMessageEncoder/RpcMessageDecoder` - Extract generic interfaces - Provide default implementation based on msgpackR - Rename existing "old" implementation into `RecursiveMessageEncoder/Decoder - Update message encoder test cases - Introduce reusable `AbstractChannel` implementation to reduce boilerplate code for channel setup Fixes #11159 Contributed on behalf of STMicroelectronics --- packages/core/package.json | 1 + .../core/src/common/message-rpc/channel.ts | 47 +- packages/core/src/common/message-rpc/index.ts | 2 +- .../message-rpc/rpc-message-encoder.spec.ts | 75 ++- .../common/message-rpc/rpc-message-encoder.ts | 507 ++++++++++-------- .../src/common/message-rpc/rpc-protocol.ts | 6 +- .../common/messaging/web-socket-channel.ts | 33 +- .../electron-ipc-connection-provider.ts | 40 +- .../electron-messaging-contribution.ts | 27 +- .../core/src/node/messaging/ipc-channel.ts | 26 +- yarn.lock | 56 ++ 11 files changed, 473 insertions(+), 347 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index 20302e7ac149e..543d54017083e 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -54,6 +54,7 @@ "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", "markdown-it": "^12.3.2", + "msgpackr": "^1.6.1", "nsfw": "^2.1.2", "p-debounce": "^2.1.0", "perfect-scrollbar": "^1.3.0", diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index 5d9fc6985cc07..8a709cd49ef59 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -14,6 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { injectable } from 'inversify'; import { Disposable, DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { ReadBuffer, WriteBuffer } from './message-buffer'; @@ -68,15 +69,11 @@ export interface ChannelCloseEvent { export type MessageProvider = () => ReadBuffer; /** - * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to - * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + * Reusable abstract {@link Channel} implementation that sets up + * the basic channel event listeners and offers a generic close method. */ -export class ForwardingChannel implements Channel { - - protected toDispose = new DisposableCollection(); - constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { - this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); - } +@injectable() +export abstract class AbstractChannel implements Channel { onCloseEmitter: Emitter = new Emitter(); get onClose(): Event { @@ -93,13 +90,43 @@ export class ForwardingChannel implements Channel { return this.onMessageEmitter.event; }; + protected toDispose: DisposableCollection = new DisposableCollection(); + + constructor() { + this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); + } + + close(): void { + this.toDispose.dispose(); + } + + abstract getWriteBuffer(): WriteBuffer; + +} + +/** + * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to + * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + */ +export class ForwardingChannel extends AbstractChannel { + + constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { + super(); + this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); + } + + // Override event listener as public readonly so that the can be accessed from an outer scope (i.e. the channel multiplexer). + override readonly onCloseEmitter: Emitter = new Emitter(); + override readonly onErrorEmitter: Emitter = new Emitter(); + override readonly onMessageEmitter: Emitter = new Emitter(); + getWriteBuffer(): WriteBuffer { return this.writeBufferSource(); } - close(): void { + override close(): void { + super.close(); this.closeHandler(); - this.toDispose.dispose(); } } diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/common/message-rpc/index.ts index 671540a31c42b..39a19eb44fa2f 100644 --- a/packages/core/src/common/message-rpc/index.ts +++ b/packages/core/src/common/message-rpc/index.ts @@ -14,5 +14,5 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; -export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; +export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel'; export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts index 174591f227833..30fee1b178382 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts @@ -15,41 +15,58 @@ // ***************************************************************************** import { expect } from 'chai'; +import { ReadBuffer, WriteBuffer } from './message-buffer'; +import { + EncodingError, MsgPackMessageDecoder, MsgPackMessageEncoder, RecursiveMessageEncoder, + RecursiveRpcMessageDecoder +} from './rpc-message-encoder'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; -import { EncodingError, RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; -describe('PPC Message Codex', () => { - describe('RPC Message Encoder & Decoder', () => { - it('should encode object into binary message and decode the message back into the original object', () => { - const buffer = new Uint8Array(1024); - const writer = new Uint8ArrayWriteBuffer(buffer); - - const encoder = new RpcMessageEncoder(); - const jsonMangled = JSON.parse(JSON.stringify(encoder)); - // The RpcMessageEncoder can decode/encode collections, whereas JSON.parse can't. => We have to manually restore the set - // eslint-disable-next-line @typescript-eslint/no-explicit-any - jsonMangled.registeredTags = (encoder as any).registeredTags; +describe('PPC Message Encoder & Decoder', () => { + describe('Msgpack Encoder & Decoder', () => { + baseEncodingTests((buf, value) => new RecursiveMessageEncoder().writeTypedValue(buf, value, new WeakSet()), buf => new RecursiveRpcMessageDecoder().readTypedValue(buf)); + }); + describe('Recursive Encoder & Decoder', () => { + baseEncodingTests((buf, value) => new MsgPackMessageEncoder().encode(buf, value), buf => new MsgPackMessageDecoder().decode(buf)); + }); +}); - encoder.writeTypedValue(writer, encoder, new WeakSet()); +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function baseEncodingTests(encode: (buf: WriteBuffer, value: any) => void, decode: (buf: ReadBuffer) => any): void { + it('should encode object into binary message and decode the message back into the original object', () => { + const buffer = new Uint8Array(1024); + const writer = new Uint8ArrayWriteBuffer(buffer); + const testObject = { + string: 'string', + boolean: true, + integer: 5, + float: 14.5, + array: ['1', 2, { three: 'three' }], + set: new Set([1, 2, 3]), + map: new Map([[1, 1], [2, 2], [3, 3]]), + buffer: new TextEncoder().encode('ThisIsAUint8Array'), + object: { foo: 'bar', baz: true }, + undefined: undefined, + // eslint-disable-next-line no-null/no-null + null: null + }; - const written = writer.getCurrentContents(); + encode(writer, testObject); + const written = writer.getCurrentContents(); - const reader = new Uint8ArrayReadBuffer(written); + const reader = new Uint8ArrayReadBuffer(written); - const decoder = new RpcMessageDecoder(); - const decoded = decoder.readTypedValue(reader); + const decoded = decode(reader); - expect(decoded).deep.equal(jsonMangled); - }); - it('should fail with an EncodingError when trying to encode the object ', () => { - const x = new Set(); - const y = new Set(); - x.add(y); - y.add(x); - const encoder = new RpcMessageEncoder(); + expect(decoded).deep.equal(testObject); + }); + it('should fail with an EncodingError when trying to encode the object ', () => { + const x = new Set(); + const y = new Set(); + x.add(y); + y.add(x); - const writer = new Uint8ArrayWriteBuffer(); - expect(() => encoder.writeTypedValue(writer, x, new WeakSet())).to.throw(EncodingError); - }); + const writer = new Uint8ArrayWriteBuffer(); + expect(() => encode(writer, x)).to.throw(EncodingError); }); -}); +} diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index 3d93b660c0349..5a27e43d003d0 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -15,6 +15,7 @@ // ***************************************************************************** /* eslint-disable @typescript-eslint/no-explicit-any */ +import { Packr as MsgPack } from 'msgpackr'; import { ReadBuffer, WriteBuffer } from './message-buffer'; /** @@ -84,6 +85,92 @@ export class ResponseError extends Error { } } +/** + * Custom error thrown by the {@link RpcMessageEncoder} if an error occurred during the encoding and the + * object could not be written to the given {@link WriteBuffer} + */ +export class EncodingError extends Error { + constructor(msg: string, public cause?: Error) { + super(msg); + } +} + +/** + * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} + */ +export interface RpcMessageDecoder { + parse(buffer: ReadBuffer): RpcMessage; +} + +/** + * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is + * up to clients to commit the message. This allows for multiple messages being + * encoded before sending. + */ +export interface RpcMessageEncoder { + cancel(buf: WriteBuffer, requestId: number): void; + + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void + + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void + + replyOK(buf: WriteBuffer, requestId: number, res: any): void + + replyErr(buf: WriteBuffer, requestId: number, err: any): void + +} + +export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: true }); +export class MsgPackMessageEncoder implements RpcMessageEncoder { + + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { + + } + + cancel(buf: WriteBuffer, requestId: number): void { + this.encode(buf, { type: RpcMessageType.Cancel, id: requestId }); + } + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + this.encode(buf, { type: RpcMessageType.Notification, id: requestId, method, args }); + } + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + this.encode(buf, { type: RpcMessageType.Request, id: requestId, method, args }); + } + replyOK(buf: WriteBuffer, requestId: number, res: any): void { + this.encode(buf, { type: RpcMessageType.Reply, id: requestId, res }); + } + replyErr(buf: WriteBuffer, requestId: number, err: any): void { + this.encode(buf, { type: RpcMessageType.ReplyErr, id: requestId, err }); + } + + encode(buf: WriteBuffer, value: T): void { + try { + buf.writeBytes(this.msgPack.encode(value)); + } catch (err) { + if (err instanceof Error) { + throw new EncodingError(`Error during encoding: '${err.message}'`, err); + } + throw err; + } + } + +} + +export class MsgPackMessageDecoder implements RpcMessageDecoder { + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { + + } + decode(buf: ReadBuffer): T { + const bytes = buf.readBytes(); + return this.msgPack.decode(bytes); + } + + parse(buffer: ReadBuffer): RpcMessage { + return this.decode(buffer); + } + +} + /** * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s */ @@ -147,19 +234,216 @@ export interface ValueDecoder { } /** - * Custom error thrown by the {@link RpcMessageEncoder} if an error occurred during the encoding and the - * object could not be written to the given {@link WriteBuffer} + * Highly customizable {@link RpcMessageEncoder} implementation. Clients can register custom + * {@link ValueEncoder}s and {@link ValueEncoder}s to specially handling certain types of values. + * Objects are encoded recursively with the corresponding {@link ValueEncoder}. + * Compared to the default {@link MsgPackMessageEncoder} encoding times are higher (especially for large objects) so + * it's recommended to only use this encoder for edge cases where the capabilities of the default encoder don't suffice. */ -export class EncodingError extends Error { - constructor(msg: string) { - super(msg); +export class RecursiveMessageEncoder implements RpcMessageEncoder { + + protected readonly encoders: [number, ValueEncoder][] = []; + protected readonly registeredTags: Set = new Set(); + + constructor() { + this.registerEncoders(); + } + + protected registerEncoders(): void { + // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last + this.registerEncoder(ObjectType.JSON, { + is: () => true, + write: (buf, value) => { + buf.writeString(JSON.stringify(value)); + } + }); + + this.registerEncoder(ObjectType.Function, { + is: value => typeof value === 'function', + write: () => { } + }); + + this.registerEncoder(ObjectType.Object, { + is: value => typeof value === 'object', + write: (buf, object, visitedObjects, recursiveEncode) => { + const properties = Object.keys(object); + const relevant = []; + for (const property of properties) { + const value = object[property]; + if (typeof value !== 'function') { + relevant.push([property, value]); + } + } + + buf.writeLength(relevant.length); + for (const [property, value] of relevant) { + buf.writeString(property); + recursiveEncode?.(buf, value, visitedObjects); + } + } + }); + + this.registerEncoder(ObjectType.Error, { + is: value => value instanceof Error, + write: (buf, error: Error) => { + const { name, message } = error; + const stack: string = (error).stacktrace || error.stack; + const serializedError = { + $isError: true, + name, + message, + stack + }; + buf.writeString(JSON.stringify(serializedError)); + } + }); + + this.registerEncoder(ObjectType.ResponseError, { + is: value => value instanceof ResponseError, + write: (buf, value) => buf.writeString(JSON.stringify(value)) + }); + + this.registerEncoder(ObjectType.Map, { + is: value => value instanceof Map, + write: (buf, value: Map, visitedObjects) => this.writeArray(buf, Array.from(value.entries()), visitedObjects) + }); + + this.registerEncoder(ObjectType.Set, { + is: value => value instanceof Set, + write: (buf, value: Set, visitedObjects) => this.writeArray(buf, [...value], visitedObjects) + }); + + this.registerEncoder(ObjectType.Null, { + // eslint-disable-next-line no-null/no-null + is: value => value === null, + write: () => { } + }); + + this.registerEncoder(ObjectType.Undefined, { + is: value => value === undefined, + write: () => { } + }); + + this.registerEncoder(ObjectType.ObjectArray, { + is: value => Array.isArray(value), + write: (buf, value, visitedObjects) => { + this.writeArray(buf, value, visitedObjects); + } + }); + + this.registerEncoder(ObjectType.ByteArray, { + is: value => value instanceof Uint8Array, + write: (buf, value) => { + buf.writeBytes(value); + } + }); + + this.registerEncoder(ObjectType.String, { + is: value => typeof value === 'string', + write: (buf, value) => { + buf.writeString(value); + } + }); + + this.registerEncoder(ObjectType.Boolean, { + is: value => typeof value === 'boolean', + write: (buf, value) => { + buf.writeUint8(value === true ? 1 : 0); + } + }); + + this.registerEncoder(ObjectType.Number, { + is: value => typeof value === 'number', + write: (buf, value) => { + buf.writeNumber(value); + } + }); + } + + /** + * Registers a new {@link ValueEncoder} for the given tag. + * After the successful registration the {@link tagIntType} is recomputed + * by retrieving the highest tag value and calculating the required Uint size to store it. + * @param tag the tag for which the encoder should be registered. + * @param decoder the encoder that should be registered. + */ + registerEncoder(tag: number, encoder: ValueEncoder): void { + if (this.registeredTags.has(tag)) { + throw new Error(`Tag already registered: ${tag}`); + } + this.registeredTags.add(tag); + this.encoders.push([tag, encoder]); + } + + cancel(buf: WriteBuffer, requestId: number): void { + buf.writeUint8(RpcMessageType.Cancel); + buf.writeUint32(requestId); + } + + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Notification); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args, new WeakSet()); + } + + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { + buf.writeUint8(RpcMessageType.Request); + buf.writeUint32(requestId); + buf.writeString(method); + this.writeArray(buf, args, new WeakSet()); + } + + replyOK(buf: WriteBuffer, requestId: number, res: any): void { + buf.writeUint8(RpcMessageType.Reply); + buf.writeUint32(requestId); + this.writeTypedValue(buf, res, new WeakSet()); + } + + replyErr(buf: WriteBuffer, requestId: number, err: any): void { + buf.writeUint8(RpcMessageType.ReplyErr); + buf.writeUint32(requestId); + this.writeTypedValue(buf, err, new WeakSet()); + } + + writeTypedValue(buf: WriteBuffer, value: any, visitedObjects: WeakSet): void { + if (value && typeof value === 'object') { + if (visitedObjects.has(value)) { + throw new EncodingError('Object to encode contains circular references!'); + } + visitedObjects.add(value); + } + + try { + for (let i: number = this.encoders.length - 1; i >= 0; i--) { + if (this.encoders[i][1].is(value)) { + buf.writeUint8(this.encoders[i][0]); + this.encoders[i][1].write(buf, value, visitedObjects, (innerBuffer, innerValue, _visitedObjects) => { + this.writeTypedValue(innerBuffer, innerValue, _visitedObjects); + }); + return; + } + } + throw new EncodingError(`No suitable value encoder found for ${value}`); + } finally { + if (value && typeof value === 'object') { + visitedObjects.delete(value); + } + } + } + + writeArray(buf: WriteBuffer, value: any[], visitedObjects: WeakSet): void { + buf.writeLength(value.length); + for (let i = 0; i < value.length; i++) { + this.writeTypedValue(buf, value[i], visitedObjects); + } } } /** - * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} + * A {@link RpcMessageDecoder} implementation that can decode messages which have been encoded with a {@link RecursiveMessageEncoder}. */ -export class RpcMessageDecoder { +export class RecursiveRpcMessageDecoder implements RpcMessageDecoder { protected decoders: Map = new Map(); @@ -258,6 +542,7 @@ export class RpcMessageDecoder { } this.decoders.set(tag, decoder); } + parse(buf: ReadBuffer): RpcMessage { try { const msgType = buf.readUint8(); @@ -316,7 +601,7 @@ export class RpcMessageDecoder { }; } - parseReply(msg: ReadBuffer): ReplyMessage { + protected parseReply(msg: ReadBuffer): ReplyMessage { const callId = msg.readUint32(); const value = this.readTypedValue(msg); return { @@ -326,7 +611,7 @@ export class RpcMessageDecoder { }; } - parseReplyErr(msg: ReadBuffer): ReplyErrMessage { + protected parseReplyErr(msg: ReadBuffer): ReplyErrMessage { const callId = msg.readUint32(); const err = this.readTypedValue(msg); @@ -357,207 +642,3 @@ export class RpcMessageDecoder { } } -/** - * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is - * up to clients to commit the message. This allows for multiple messages being - * encoded before sending. - */ -export class RpcMessageEncoder { - - protected readonly encoders: [number, ValueEncoder][] = []; - protected readonly registeredTags: Set = new Set(); - - constructor() { - this.registerEncoders(); - } - - protected registerEncoders(): void { - // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last - this.registerEncoder(ObjectType.JSON, { - is: () => true, - write: (buf, value) => { - buf.writeString(JSON.stringify(value)); - } - }); - - this.registerEncoder(ObjectType.Function, { - is: value => typeof value === 'function', - write: () => { } - }); - - this.registerEncoder(ObjectType.Object, { - is: value => typeof value === 'object', - write: (buf, object, visitedObjects, recursiveEncode) => { - const properties = Object.keys(object); - const relevant = []; - for (const property of properties) { - const value = object[property]; - if (typeof value !== 'function') { - relevant.push([property, value]); - } - } - - buf.writeLength(relevant.length); - for (const [property, value] of relevant) { - buf.writeString(property); - recursiveEncode?.(buf, value, visitedObjects); - } - } - }); - - this.registerEncoder(ObjectType.Error, { - is: value => value instanceof Error, - write: (buf, error: Error) => { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - const serializedError = { - $isError: true, - name, - message, - stack - }; - buf.writeString(JSON.stringify(serializedError)); - } - }); - - this.registerEncoder(ObjectType.ResponseError, { - is: value => value instanceof ResponseError, - write: (buf, value) => buf.writeString(JSON.stringify(value)) - }); - - this.registerEncoder(ObjectType.Map, { - is: value => value instanceof Map, - write: (buf, value: Map, visitedObjects) => this.writeArray(buf, Array.from(value.entries()), visitedObjects) - }); - - this.registerEncoder(ObjectType.Set, { - is: value => value instanceof Set, - write: (buf, value: Set, visitedObjects) => this.writeArray(buf, [...value], visitedObjects) - }); - - this.registerEncoder(ObjectType.Null, { - // eslint-disable-next-line no-null/no-null - is: value => value === null, - write: () => { } - }); - - this.registerEncoder(ObjectType.Undefined, { - is: value => value === undefined, - write: () => { } - }); - - this.registerEncoder(ObjectType.ObjectArray, { - is: value => Array.isArray(value), - write: (buf, value, visitedObjects) => { - this.writeArray(buf, value, visitedObjects); - } - }); - - this.registerEncoder(ObjectType.ByteArray, { - is: value => value instanceof Uint8Array, - write: (buf, value) => { - buf.writeBytes(value); - } - }); - - this.registerEncoder(ObjectType.String, { - is: value => typeof value === 'string', - write: (buf, value) => { - buf.writeString(value); - } - }); - - this.registerEncoder(ObjectType.Boolean, { - is: value => typeof value === 'boolean', - write: (buf, value) => { - buf.writeUint8(value === true ? 1 : 0); - } - }); - - this.registerEncoder(ObjectType.Number, { - is: value => typeof value === 'number', - write: (buf, value) => { - buf.writeNumber(value); - } - }); - } - - /** - * Registers a new {@link ValueEncoder} for the given tag. - * After the successful registration the {@link tagIntType} is recomputed - * by retrieving the highest tag value and calculating the required Uint size to store it. - * @param tag the tag for which the encoder should be registered. - * @param decoder the encoder that should be registered. - */ - registerEncoder(tag: number, encoder: ValueEncoder): void { - if (this.registeredTags.has(tag)) { - throw new Error(`Tag already registered: ${tag}`); - } - this.registeredTags.add(tag); - this.encoders.push([tag, encoder]); - } - - cancel(buf: WriteBuffer, requestId: number): void { - buf.writeUint8(RpcMessageType.Cancel); - buf.writeUint32(requestId); - } - - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Notification); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); - } - - request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Request); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); - } - - replyOK(buf: WriteBuffer, requestId: number, res: any): void { - buf.writeUint8(RpcMessageType.Reply); - buf.writeUint32(requestId); - this.writeTypedValue(buf, res, new WeakSet()); - } - - replyErr(buf: WriteBuffer, requestId: number, err: any): void { - buf.writeUint8(RpcMessageType.ReplyErr); - buf.writeUint32(requestId); - this.writeTypedValue(buf, err, new WeakSet()); - } - - writeTypedValue(buf: WriteBuffer, value: any, visitedObjects: WeakSet): void { - if (value && typeof value === 'object') { - if (visitedObjects.has(value)) { - throw new EncodingError('Object to encode contains circular references!'); - } - visitedObjects.add(value); - } - - try { - for (let i: number = this.encoders.length - 1; i >= 0; i--) { - if (this.encoders[i][1].is(value)) { - buf.writeUint8(this.encoders[i][0]); - this.encoders[i][1].write(buf, value, visitedObjects, (innerBuffer, innerValue, _visitedObjects) => { - this.writeTypedValue(innerBuffer, innerValue, _visitedObjects); - }); - return; - } - } - throw new EncodingError(`No suitable value encoder found for ${value}`); - } finally { - if (value && typeof value === 'object') { - visitedObjects.delete(value); - } - } - } - - writeArray(buf: WriteBuffer, value: any[], visitedObjects: WeakSet): void { - buf.writeLength(value.length); - for (let i = 0; i < value.length; i++) { - this.writeTypedValue(buf, value[i], visitedObjects); - } - } -} diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 6e037e6e8befd..6dfdbb125dad9 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -20,7 +20,7 @@ import { DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; -import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; +import { MsgPackMessageDecoder, MsgPackMessageEncoder, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; /** @@ -69,8 +69,8 @@ export class RpcProtocol { protected toDispose = new DisposableCollection(); constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { - this.encoder = options.encoder ?? new RpcMessageEncoder(); - this.decoder = options.decoder ?? new RpcMessageDecoder(); + this.encoder = options.encoder ?? new MsgPackMessageEncoder(); + this.decoder = options.decoder ?? new MsgPackMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); channel.onClose(() => this.toDispose.dispose()); diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 66b90eed33739..2a6fcbb2e31fe 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,43 +16,25 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '../event'; import { WriteBuffer } from '../message-rpc'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; -import { DisposableCollection } from '../disposable'; +import { AbstractChannel } from '../message-rpc/channel'; +import { Disposable } from 'vscode-languageserver-protocol'; /** * A channel that manages the main websocket connection between frontend and backend. All service channels * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation * independent of the actual websocket implementation and its execution context (backend vs. frontend). */ -export class WebSocketChannel implements Channel { +export class WebSocketChannel extends AbstractChannel { static wsPath = '/services'; - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } - - protected toDispose = new DisposableCollection(); - constructor(protected readonly socket: IWebSocket) { - this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); + super(); + this.toDispose.push(Disposable.create(() => socket.close())); socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); socket.onClose(() => this.close()); socket.onError(error => this.onErrorEmitter.fire(error)); - // eslint-disable-next-line arrow-body-style socket.onMessage(data => this.onMessageEmitter.fire(() => { // In the browser context socketIO receives binary messages as ArrayBuffers. // So we have to convert them to a Uint8Array before delegating the message to the read buffer. @@ -72,11 +54,6 @@ export class WebSocketChannel implements Channel { return result; } - - close(): void { - this.toDispose.dispose(); - this.socket.close(); - } } /** diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 3cce9e75c3951..7e4410e6be60d 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -19,9 +19,8 @@ import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; -import { Emitter, Event } from '../../common'; +import { AbstractChannel, Channel, Disposable, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -42,23 +41,26 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); - }); - return { - close: () => Event.None, - getWriteBuffer: () => { - const writer = new Uint8ArrayWriteBuffer(); - writer.onCommit(buffer => - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) - ); - return writer; - }, - onClose: Event.None, - onError: Event.None, - onMessage: onMessageEmitter.event - }; + return new ElectronIpcRendererChannel(); + } + +} + +export class ElectronIpcRendererChannel extends AbstractChannel { + + constructor() { + super(); + const ipcMessageHandler = (_event: ElectronEvent, data: Uint8Array) => this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageHandler); + this.toDispose.push(Disposable.create(() => ipcRenderer.removeListener(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageHandler))); + } + + getWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) + ); + return writer; } } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index b2e4b00e2e1a3..28dffd746a23d 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -21,8 +21,8 @@ import { MessagingContribution } from '../../node/messaging/messaging-contributi import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; -import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; -import { Emitter, Event, WriteBuffer } from '../../common'; +import { AbstractChannel, Channel, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; +import { Emitter, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** @@ -114,24 +114,13 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon * Used to establish a connection between the ipcMain and the Electron frontend (window). * Messages a transferred via electron IPC. */ -export class ElectronWebContentChannel implements Channel { - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } +export class ElectronWebContentChannel extends AbstractChannel { // Make the message emitter public so that we can easily forward messages received from the ipcMain. - readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } + override readonly onMessageEmitter: Emitter = new Emitter(); constructor(protected readonly sender: Electron.WebContents) { + super(); } getWriteBuffer(): WriteBuffer { @@ -145,9 +134,5 @@ export class ElectronWebContentChannel implements Channel { return writer; } - close(): void { - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); - } + } diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts index 1098aa1215771..acc65e5c6f38b 100644 --- a/packages/core/src/node/messaging/ipc-channel.ts +++ b/packages/core/src/node/messaging/ipc-channel.ts @@ -18,7 +18,7 @@ import * as cp from 'child_process'; import { Socket } from 'net'; import { Duplex } from 'stream'; -import { Channel, ChannelCloseEvent, Disposable, DisposableCollection, Emitter, Event, MessageProvider, WriteBuffer } from '../../common'; +import { AbstractChannel, Disposable, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { BinaryMessagePipe } from './binary-message-pipe'; @@ -27,29 +27,14 @@ import { BinaryMessagePipe } from './binary-message-pipe'; * This fd is opened as 5th channel in addition to the default stdios (stdin, stdout, stderr, ipc). This means the default channels * are not blocked and can be used by the respective process for additional custom message handling. */ -export class IPCChannel implements Channel { - - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } +export class IPCChannel extends AbstractChannel { protected messagePipe: BinaryMessagePipe; - protected toDispose = new DisposableCollection(); protected ipcErrorListener: (error: Error) => void = error => this.onErrorEmitter.fire(error); constructor(childProcess?: cp.ChildProcess) { + super(); if (childProcess) { this.setupChildProcess(childProcess); } else { @@ -58,7 +43,6 @@ export class IPCChannel implements Channel { this.messagePipe.onMessage(message => { this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message)); }); - this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); } protected setupChildProcess(childProcess: cp.ChildProcess): void { @@ -90,8 +74,4 @@ export class IPCChannel implements Channel { return result; } - close(): void { - this.toDispose.dispose(); - } - } diff --git a/yarn.lock b/yarn.lock index 6d0faa852efe4..dc279c736e489 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1722,6 +1722,36 @@ dependencies: cross-spawn "^7.0.1" +"@msgpackr-extract/msgpackr-extract-darwin-arm64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.0.2.tgz#01e3669b8b2dc01f6353f2c87e1ec94faf52c587" + integrity sha512-FMX5i7a+ojIguHpWbzh5MCsCouJkwf4z4ejdUY/fsgB9Vkdak4ZnoIEskOyOUMMB4lctiZFGszFQJXUeFL8tRg== + +"@msgpackr-extract/msgpackr-extract-darwin-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-2.0.2.tgz#5ca32f16e6f1b7854001a1a2345b61d4e26a0931" + integrity sha512-DznYtF3lHuZDSRaIOYeif4JgO0NtO2Xf8DsngAugMx/bUdTFbg86jDTmkVJBNmV+cxszz6OjGvinnS8AbJ342g== + +"@msgpackr-extract/msgpackr-extract-linux-arm64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-2.0.2.tgz#ff629f94379981bf476dffb1439a7c1d3dba2d72" + integrity sha512-b0jMEo566YdM2K+BurSed7bswjo3a6bcdw5ETqoIfSuxKuRLPfAiOjVbZyZBgx3J/TAM/QrvEQ/VN89A0ZAxSg== + +"@msgpackr-extract/msgpackr-extract-linux-arm@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-2.0.2.tgz#5f6fd30d266c4a90cf989049c7f2e50e5d4fcd4c" + integrity sha512-Gy9+c3Wj+rUlD3YvCZTi92gs+cRX7ZQogtwq0IhRenloTTlsbpezNgk6OCkt59V4ATEWSic9rbU92H/l7XsRvA== + +"@msgpackr-extract/msgpackr-extract-linux-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-2.0.2.tgz#167faa553b9dbffac8b03bf27de9b6f846f0e1bc" + integrity sha512-zrBHaePwcv4cQXxzYgNj0+A8I1uVN97E7/3LmkRocYZ+rMwUsnPpp4RuTAHSRoKlTQV3nSdCQW4Qdt4MXw/iHw== + +"@msgpackr-extract/msgpackr-extract-win32-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-2.0.2.tgz#baea7764b1adf201ce4a792fe971fd7211dad2e4" + integrity sha512-fpnI00dt+yO1cKx9qBXelKhPBdEgvc8ZPav1+0r09j0woYQU2N79w/jcGawSY5UGlgQ3vjaJsFHnGbGvvqdLzg== + "@nodelib/fs.scandir@2.1.5": version "2.1.5" resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz#7619c2eb21b25483f6d167548b4cfd5a7488c3d5" @@ -8041,6 +8071,27 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +msgpackr-extract@^2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-2.0.2.tgz#201a8d7ade47e99b3ba277c45736b00e195d4670" + integrity sha512-coskCeJG2KDny23zWeu+6tNy7BLnAiOGgiwzlgdm4oeSsTpqEJJPguHIuKZcCdB7tzhZbXNYSg6jZAXkZErkJA== + dependencies: + node-gyp-build-optional-packages "5.0.2" + optionalDependencies: + "@msgpackr-extract/msgpackr-extract-darwin-arm64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-darwin-x64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-arm" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-arm64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2" + +msgpackr@^1.6.1: + version "1.6.1" + resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.1.tgz#4f3c94d6a5b819b838ffc736eddaf60eba436d20" + integrity sha512-Je+xBEfdjtvA4bKaOv8iRhjC8qX2oJwpYH4f7JrG4uMVJVmnmkAT4pjKdbztKprGj3iwjcxPzb5umVZ02Qq3tA== + optionalDependencies: + msgpackr-extract "^2.0.2" + multer@1.4.4-lts.1: version "1.4.4-lts.1" resolved "https://registry.yarnpkg.com/multer/-/multer-1.4.4-lts.1.tgz#24100f701a4611211cfae94ae16ea39bb314e04d" @@ -8188,6 +8239,11 @@ node-fetch@^2.6.1, node-fetch@^2.6.7: dependencies: whatwg-url "^5.0.0" +node-gyp-build-optional-packages@5.0.2: + version "5.0.2" + resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.2.tgz#3de7d30bd1f9057b5dfbaeab4a4442b7fe9c5901" + integrity sha512-PiN4NWmlQPqvbEFcH/omQsswWQbe5Z9YK/zdB23irp5j2XibaA2IrGvpSWmVVG4qMZdmPdwPctSy4a86rOMn6g== + node-gyp-build@^4.2.1: version "4.4.0" resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.4.0.tgz#42e99687ce87ddeaf3a10b99dc06abc11021f3f4"