diff --git a/packages/core/package.json b/packages/core/package.json index 1ba952a1854e6..84a44378afaa7 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -55,6 +55,7 @@ "lodash.debounce": "^4.0.8", "lodash.throttle": "^4.1.1", "markdown-it": "^12.3.2", + "msgpackr": "^1.6.1", "nsfw": "^2.1.2", "p-debounce": "^2.1.0", "perfect-scrollbar": "^1.3.0", diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index 5d9fc6985cc07..cff56221df2a8 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -14,6 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { injectable } from '../../../shared/inversify'; import { Disposable, DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { ReadBuffer, WriteBuffer } from './message-buffer'; @@ -68,15 +69,11 @@ export interface ChannelCloseEvent { export type MessageProvider = () => ReadBuffer; /** - * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to - * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + * Reusable abstract {@link Channel} implementation that sets up + * the basic channel event listeners and offers a generic close method. */ -export class ForwardingChannel implements Channel { - - protected toDispose = new DisposableCollection(); - constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { - this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); - } +@injectable() +export abstract class AbstractChannel implements Channel { onCloseEmitter: Emitter = new Emitter(); get onClose(): Event { @@ -93,13 +90,37 @@ export class ForwardingChannel implements Channel { return this.onMessageEmitter.event; }; + protected toDispose: DisposableCollection = new DisposableCollection(); + + constructor() { + this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); + } + + close(): void { + this.toDispose.dispose(); + } + + abstract getWriteBuffer(): WriteBuffer; + +} + +/** + * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to + * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. + */ +export class ForwardingChannel extends AbstractChannel { + + constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { + super(); + } + getWriteBuffer(): WriteBuffer { return this.writeBufferSource(); } - close(): void { + override close(): void { + super.close(); this.closeHandler(); - this.toDispose.dispose(); } } diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/common/message-rpc/index.ts index 671540a31c42b..39a19eb44fa2f 100644 --- a/packages/core/src/common/message-rpc/index.ts +++ b/packages/core/src/common/message-rpc/index.ts @@ -14,5 +14,5 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; -export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; +export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel'; export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts index 174591f227833..2db800530d6ee 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts @@ -15,41 +15,51 @@ // ***************************************************************************** import { expect } from 'chai'; +import { + EncodingError, MsgPackMessageDecoder, MsgPackMessageEncoder +} from './rpc-message-encoder'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; -import { EncodingError, RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; -describe('PPC Message Codex', () => { - describe('RPC Message Encoder & Decoder', () => { +describe('PPC Message Encoder & Decoder', () => { + describe('Msgpack Encoder & Decoder', () => { it('should encode object into binary message and decode the message back into the original object', () => { const buffer = new Uint8Array(1024); const writer = new Uint8ArrayWriteBuffer(buffer); + const testObject = { + string: 'string', + boolean: true, + integer: 5, + float: 14.5, + array: ['1', 2, { three: 'three' }], + set: new Set([1, 2, 3]), + map: new Map([[1, 1], [2, 2], [3, 3]]), + buffer: new TextEncoder().encode('ThisIsAUint8Array'), + object: { foo: 'bar', baz: true }, + undefined: undefined, + // eslint-disable-next-line no-null/no-null + null: null + }; - const encoder = new RpcMessageEncoder(); - const jsonMangled = JSON.parse(JSON.stringify(encoder)); - // The RpcMessageEncoder can decode/encode collections, whereas JSON.parse can't. => We have to manually restore the set - // eslint-disable-next-line @typescript-eslint/no-explicit-any - jsonMangled.registeredTags = (encoder as any).registeredTags; - - encoder.writeTypedValue(writer, encoder, new WeakSet()); - + const encoder = new MsgPackMessageEncoder(); + encoder.encode(writer, testObject); const written = writer.getCurrentContents(); const reader = new Uint8ArrayReadBuffer(written); - const decoder = new RpcMessageDecoder(); - const decoded = decoder.readTypedValue(reader); + const decoder = new MsgPackMessageDecoder(); + const decoded = decoder.decode(reader); - expect(decoded).deep.equal(jsonMangled); + expect(decoded).deep.equal(testObject); }); it('should fail with an EncodingError when trying to encode the object ', () => { const x = new Set(); const y = new Set(); x.add(y); y.add(x); - const encoder = new RpcMessageEncoder(); const writer = new Uint8ArrayWriteBuffer(); - expect(() => encoder.writeTypedValue(writer, x, new WeakSet())).to.throw(EncodingError); + const encoder = new MsgPackMessageEncoder(); + expect(() => encoder.encode(writer, x)).to.throw(EncodingError); }); }); }); diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts index 3d93b660c0349..f588269357132 100644 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts @@ -15,6 +15,7 @@ // ***************************************************************************** /* eslint-disable @typescript-eslint/no-explicit-any */ +import { addExtension, Packr as MsgPack } from 'msgpackr'; import { ReadBuffer, WriteBuffer } from './message-buffer'; /** @@ -84,74 +85,12 @@ export class ResponseError extends Error { } } -/** - * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s - */ - -export enum ObjectType { - JSON = 0, - ByteArray = 1, - ObjectArray = 2, - Null = 3, - Undefined = 4, - Object = 5, - String = 6, - Boolean = 7, - Number = 8, - // eslint-disable-next-line @typescript-eslint/no-shadow - ResponseError = 9, - Error = 10, - Map = 11, - Set = 12, - Function = 13 - -} - -/** - * A value encoder writes javascript values to a write buffer. Encoders will be asked - * in turn (ordered by their tag value, descending) whether they can encode a given value - * This means encoders with higher tag values have priority. Since the default encoders - * have tag values from 1-13, they can be easily overridden. - */ -export interface ValueEncoder { - /** - * Returns true if this encoder wants to encode this value. - * @param value the value to be encoded - */ - is(value: any): boolean; - /** - * Write the given value to the buffer. Will only be called if {@link is(value)} returns true. - * @param buf The buffer to write to - * @param value The value to be written - * @param visitedObjects The collection of already visited (i.e. encoded) objects. Used to detect circular references - * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} - * to write a value to the underlying buffer. This is used mostly to write structures like an array - * without having to know how to encode the values in the array - */ - write(buf: WriteBuffer, value: any, visitedObjects: WeakSet, recursiveEncode?: (buf: WriteBuffer, value: any, visitedObjects: WeakSet) => void): void; -} - -/** - * Reads javascript values from a read buffer - */ -export interface ValueDecoder { - /** - * Reads a value from a read buffer. This method will be called for the decoder that is - * registered for the tag associated with the value encoder that encoded this value. - * @param buf The read buffer to read from - * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder} - * to read values from the underlying read buffer. This is used mostly to decode structures like an array - * without having to know how to decode the values in the array. - */ - read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; -} - /** * Custom error thrown by the {@link RpcMessageEncoder} if an error occurred during the encoding and the * object could not be written to the given {@link WriteBuffer} */ export class EncodingError extends Error { - constructor(msg: string) { + constructor(msg: string, public cause?: Error) { super(msg); } } @@ -159,202 +98,8 @@ export class EncodingError extends Error { /** * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} */ -export class RpcMessageDecoder { - - protected decoders: Map = new Map(); - - constructor() { - this.registerDecoders(); - } - - registerDecoders(): void { - this.registerDecoder(ObjectType.JSON, { - read: buf => { - const json = buf.readString(); - return JSON.parse(json); - } - }); - this.registerDecoder(ObjectType.Error, { - read: buf => { - const serializedError: SerializedError = JSON.parse(buf.readString()); - const error = new Error(serializedError.message); - Object.assign(error, serializedError); - return error; - } - }); - - this.registerDecoder(ObjectType.ResponseError, { - read: buf => { - const error = JSON.parse(buf.readString()); - return new ResponseError(error.code, error.message, error.data); - } - }); - this.registerDecoder(ObjectType.ByteArray, { - read: buf => buf.readBytes() - }); - - this.registerDecoder(ObjectType.ObjectArray, { - read: buf => this.readArray(buf) - }); - - this.registerDecoder(ObjectType.Undefined, { - read: () => undefined - }); - - this.registerDecoder(ObjectType.Null, { - // eslint-disable-next-line no-null/no-null - read: () => null - }); - - this.registerDecoder(ObjectType.Object, { - read: (buf, recursiveRead) => { - const propertyCount = buf.readLength(); - const result = Object.create({}); - for (let i = 0; i < propertyCount; i++) { - const key = buf.readString(); - const value = recursiveRead(buf); - result[key] = value; - } - return result; - } - }); - - this.registerDecoder(ObjectType.String, { - read: (buf, recursiveRead) => buf.readString() - }); - - this.registerDecoder(ObjectType.Boolean, { - read: buf => buf.readUint8() === 1 - }); - - this.registerDecoder(ObjectType.Number, { - read: buf => buf.readNumber() - }); - - this.registerDecoder(ObjectType.Map, { - read: buf => new Map(this.readArray(buf)) - }); - - this.registerDecoder(ObjectType.Set, { - read: buf => new Set(this.readArray(buf)) - }); - - this.registerDecoder(ObjectType.Function, { - read: () => ({}) - }); - - } - - /** - * Registers a new {@link ValueDecoder} for the given tag. - * After the successful registration the {@link tagIntType} is recomputed - * by retrieving the highest tag value and calculating the required Uint size to store it. - * @param tag the tag for which the decoder should be registered. - * @param decoder the decoder that should be registered. - */ - registerDecoder(tag: number, decoder: ValueDecoder): void { - if (this.decoders.has(tag)) { - throw new Error(`Decoder already registered: ${tag}`); - } - this.decoders.set(tag, decoder); - } - parse(buf: ReadBuffer): RpcMessage { - try { - const msgType = buf.readUint8(); - - switch (msgType) { - case RpcMessageType.Request: - return this.parseRequest(buf); - case RpcMessageType.Notification: - return this.parseNotification(buf); - case RpcMessageType.Reply: - return this.parseReply(buf); - case RpcMessageType.ReplyErr: - return this.parseReplyErr(buf); - case RpcMessageType.Cancel: - return this.parseCancel(buf); - } - throw new Error(`Unknown message type: ${msgType}`); - } catch (e) { - // exception does not show problematic content: log it! - console.log('failed to parse message: ' + buf); - throw e; - } - } - - protected parseCancel(msg: ReadBuffer): CancelMessage { - const callId = msg.readUint32(); - return { - type: RpcMessageType.Cancel, - id: callId - }; - } - - protected parseRequest(msg: ReadBuffer): RequestMessage { - const callId = msg.readUint32(); - const method = msg.readString(); - const args = this.readArray(msg); - - return { - type: RpcMessageType.Request, - id: callId, - method: method, - args: args - }; - } - - protected parseNotification(msg: ReadBuffer): NotificationMessage { - const callId = msg.readUint32(); - const method = msg.readString(); - const args = this.readArray(msg); - - return { - type: RpcMessageType.Notification, - id: callId, - method: method, - args: args - }; - } - - parseReply(msg: ReadBuffer): ReplyMessage { - const callId = msg.readUint32(); - const value = this.readTypedValue(msg); - return { - type: RpcMessageType.Reply, - id: callId, - res: value - }; - } - - parseReplyErr(msg: ReadBuffer): ReplyErrMessage { - const callId = msg.readUint32(); - - const err = this.readTypedValue(msg); - - return { - type: RpcMessageType.ReplyErr, - id: callId, - err - }; - } - - readArray(buf: ReadBuffer): any[] { - const length = buf.readLength(); - const result = new Array(length); - for (let i = 0; i < length; i++) { - result[i] = this.readTypedValue(buf); - } - return result; - } - - readTypedValue(buf: ReadBuffer): any { - const type = buf.readUint8(); - const decoder = this.decoders.get(type); - if (!decoder) { - throw new Error(`No decoder for tag ${type}`); - } - return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); - } +export interface RpcMessageDecoder { + parse(buffer: ReadBuffer): RpcMessage; } /** @@ -362,202 +107,82 @@ export class RpcMessageDecoder { * up to clients to commit the message. This allows for multiple messages being * encoded before sending. */ -export class RpcMessageEncoder { - - protected readonly encoders: [number, ValueEncoder][] = []; - protected readonly registeredTags: Set = new Set(); - - constructor() { - this.registerEncoders(); - } - - protected registerEncoders(): void { - // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last - this.registerEncoder(ObjectType.JSON, { - is: () => true, - write: (buf, value) => { - buf.writeString(JSON.stringify(value)); - } - }); - - this.registerEncoder(ObjectType.Function, { - is: value => typeof value === 'function', - write: () => { } - }); - - this.registerEncoder(ObjectType.Object, { - is: value => typeof value === 'object', - write: (buf, object, visitedObjects, recursiveEncode) => { - const properties = Object.keys(object); - const relevant = []; - for (const property of properties) { - const value = object[property]; - if (typeof value !== 'function') { - relevant.push([property, value]); - } - } - - buf.writeLength(relevant.length); - for (const [property, value] of relevant) { - buf.writeString(property); - recursiveEncode?.(buf, value, visitedObjects); - } - } - }); - - this.registerEncoder(ObjectType.Error, { - is: value => value instanceof Error, - write: (buf, error: Error) => { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - const serializedError = { - $isError: true, - name, - message, - stack - }; - buf.writeString(JSON.stringify(serializedError)); - } - }); - - this.registerEncoder(ObjectType.ResponseError, { - is: value => value instanceof ResponseError, - write: (buf, value) => buf.writeString(JSON.stringify(value)) - }); - - this.registerEncoder(ObjectType.Map, { - is: value => value instanceof Map, - write: (buf, value: Map, visitedObjects) => this.writeArray(buf, Array.from(value.entries()), visitedObjects) - }); +export interface RpcMessageEncoder { + cancel(buf: WriteBuffer, requestId: number): void; - this.registerEncoder(ObjectType.Set, { - is: value => value instanceof Set, - write: (buf, value: Set, visitedObjects) => this.writeArray(buf, [...value], visitedObjects) - }); + notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void - this.registerEncoder(ObjectType.Null, { - // eslint-disable-next-line no-null/no-null - is: value => value === null, - write: () => { } - }); + request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void - this.registerEncoder(ObjectType.Undefined, { - is: value => value === undefined, - write: () => { } - }); + replyOK(buf: WriteBuffer, requestId: number, res: any): void - this.registerEncoder(ObjectType.ObjectArray, { - is: value => Array.isArray(value), - write: (buf, value, visitedObjects) => { - this.writeArray(buf, value, visitedObjects); - } - }); + replyErr(buf: WriteBuffer, requestId: number, err: any): void - this.registerEncoder(ObjectType.ByteArray, { - is: value => value instanceof Uint8Array, - write: (buf, value) => { - buf.writeBytes(value); - } - }); +} - this.registerEncoder(ObjectType.String, { - is: value => typeof value === 'string', - write: (buf, value) => { - buf.writeString(value); - } - }); +export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: true }); +// Add custom msgpackR extension for ResponseErrors. +addExtension({ + Class: ResponseError, + type: 1, + write: (instance: ResponseError) => { + const { code, data, message, name, stack } = instance; + return { code, data, message, name, stack }; + }, + read: data => { + const error = new ResponseError(data.code, data.message, data.data); + error.name = data.name; + error.stack = data.stack; + return error; + } +}); - this.registerEncoder(ObjectType.Boolean, { - is: value => typeof value === 'boolean', - write: (buf, value) => { - buf.writeUint8(value === true ? 1 : 0); - } - }); +export class MsgPackMessageEncoder implements RpcMessageEncoder { - this.registerEncoder(ObjectType.Number, { - is: value => typeof value === 'number', - write: (buf, value) => { - buf.writeNumber(value); - } - }); - } + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { - /** - * Registers a new {@link ValueEncoder} for the given tag. - * After the successful registration the {@link tagIntType} is recomputed - * by retrieving the highest tag value and calculating the required Uint size to store it. - * @param tag the tag for which the encoder should be registered. - * @param decoder the encoder that should be registered. - */ - registerEncoder(tag: number, encoder: ValueEncoder): void { - if (this.registeredTags.has(tag)) { - throw new Error(`Tag already registered: ${tag}`); - } - this.registeredTags.add(tag); - this.encoders.push([tag, encoder]); } cancel(buf: WriteBuffer, requestId: number): void { - buf.writeUint8(RpcMessageType.Cancel); - buf.writeUint32(requestId); + this.encode(buf, { type: RpcMessageType.Cancel, id: requestId }); } - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Notification); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); + this.encode(buf, { type: RpcMessageType.Notification, id: requestId, method, args }); } - request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Request); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); + this.encode(buf, { type: RpcMessageType.Request, id: requestId, method, args }); } - replyOK(buf: WriteBuffer, requestId: number, res: any): void { - buf.writeUint8(RpcMessageType.Reply); - buf.writeUint32(requestId); - this.writeTypedValue(buf, res, new WeakSet()); + this.encode(buf, { type: RpcMessageType.Reply, id: requestId, res }); } - replyErr(buf: WriteBuffer, requestId: number, err: any): void { - buf.writeUint8(RpcMessageType.ReplyErr); - buf.writeUint32(requestId); - this.writeTypedValue(buf, err, new WeakSet()); + this.encode(buf, { type: RpcMessageType.ReplyErr, id: requestId, err }); } - writeTypedValue(buf: WriteBuffer, value: any, visitedObjects: WeakSet): void { - if (value && typeof value === 'object') { - if (visitedObjects.has(value)) { - throw new EncodingError('Object to encode contains circular references!'); - } - visitedObjects.add(value); - } - + encode(buf: WriteBuffer, value: T): void { try { - for (let i: number = this.encoders.length - 1; i >= 0; i--) { - if (this.encoders[i][1].is(value)) { - buf.writeUint8(this.encoders[i][0]); - this.encoders[i][1].write(buf, value, visitedObjects, (innerBuffer, innerValue, _visitedObjects) => { - this.writeTypedValue(innerBuffer, innerValue, _visitedObjects); - }); - return; - } - } - throw new EncodingError(`No suitable value encoder found for ${value}`); - } finally { - if (value && typeof value === 'object') { - visitedObjects.delete(value); + buf.writeBytes(this.msgPack.encode(value)); + } catch (err) { + if (err instanceof Error) { + throw new EncodingError(`Error during encoding: '${err.message}'`, err); } + throw err; } } - writeArray(buf: WriteBuffer, value: any[], visitedObjects: WeakSet): void { - buf.writeLength(value.length); - for (let i = 0; i < value.length; i++) { - this.writeTypedValue(buf, value[i], visitedObjects); - } +} + +export class MsgPackMessageDecoder implements RpcMessageDecoder { + constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { + + } + decode(buf: ReadBuffer): T { + const bytes = buf.readBytes(); + return this.msgPack.decode(bytes); } + + parse(buffer: ReadBuffer): RpcMessage { + return this.decode(buffer); + } + } diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 6e037e6e8befd..6dfdbb125dad9 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -20,7 +20,7 @@ import { DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; -import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; +import { MsgPackMessageDecoder, MsgPackMessageEncoder, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; /** @@ -69,8 +69,8 @@ export class RpcProtocol { protected toDispose = new DisposableCollection(); constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { - this.encoder = options.encoder ?? new RpcMessageEncoder(); - this.decoder = options.decoder ?? new RpcMessageDecoder(); + this.encoder = options.encoder ?? new MsgPackMessageEncoder(); + this.decoder = options.decoder ?? new MsgPackMessageDecoder(); this.toDispose.push(this.onNotificationEmitter); this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); channel.onClose(() => this.toDispose.dispose()); diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 66b90eed33739..65df8fe85fafe 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,43 +16,25 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '../event'; import { WriteBuffer } from '../message-rpc'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; -import { DisposableCollection } from '../disposable'; +import { AbstractChannel } from '../message-rpc/channel'; +import { Disposable } from '../disposable'; /** * A channel that manages the main websocket connection between frontend and backend. All service channels * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation * independent of the actual websocket implementation and its execution context (backend vs. frontend). */ -export class WebSocketChannel implements Channel { +export class WebSocketChannel extends AbstractChannel { static wsPath = '/services'; - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } - - protected toDispose = new DisposableCollection(); - constructor(protected readonly socket: IWebSocket) { - this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); + super(); + this.toDispose.push(Disposable.create(() => socket.close())); socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); socket.onClose(() => this.close()); socket.onError(error => this.onErrorEmitter.fire(error)); - // eslint-disable-next-line arrow-body-style socket.onMessage(data => this.onMessageEmitter.fire(() => { // In the browser context socketIO receives binary messages as ArrayBuffers. // So we have to convert them to a Uint8Array before delegating the message to the read buffer. @@ -72,11 +54,6 @@ export class WebSocketChannel implements Channel { return result; } - - close(): void { - this.toDispose.dispose(); - this.socket.close(); - } } /** diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 3cce9e75c3951..7e4410e6be60d 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -19,9 +19,8 @@ import { injectable, interfaces } from 'inversify'; import { JsonRpcProxy } from '../../common/messaging'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; -import { Emitter, Event } from '../../common'; +import { AbstractChannel, Channel, Disposable, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider } from '../../common/message-rpc/channel'; export interface ElectronIpcOptions { } @@ -42,23 +41,26 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); - }); - return { - close: () => Event.None, - getWriteBuffer: () => { - const writer = new Uint8ArrayWriteBuffer(); - writer.onCommit(buffer => - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) - ); - return writer; - }, - onClose: Event.None, - onError: Event.None, - onMessage: onMessageEmitter.event - }; + return new ElectronIpcRendererChannel(); + } + +} + +export class ElectronIpcRendererChannel extends AbstractChannel { + + constructor() { + super(); + const ipcMessageHandler = (_event: ElectronEvent, data: Uint8Array) => this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageHandler); + this.toDispose.push(Disposable.create(() => ipcRenderer.removeListener(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageHandler))); + } + + getWriteBuffer(): WriteBuffer { + const writer = new Uint8ArrayWriteBuffer(); + writer.onCommit(buffer => + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) + ); + return writer; } } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index e7aa0b511c711..7dbb31cd9140e 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -21,8 +21,8 @@ import { MessagingContribution } from '../../node/messaging/messaging-contributi import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; -import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; -import { Emitter, Event, WriteBuffer } from '../../common'; +import { AbstractChannel, Channel, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; +import { Emitter, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; /** @@ -114,24 +114,13 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon * Used to establish a connection between the ipcMain and the Electron frontend (window). * Messages a transferred via electron IPC. */ -export class ElectronWebContentChannel implements Channel { - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } +export class ElectronWebContentChannel extends AbstractChannel { // Make the message emitter public so that we can easily forward messages received from the ipcMain. - readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } + override readonly onMessageEmitter: Emitter = new Emitter(); constructor(protected readonly sender: Electron.WebContents) { + super(); } getWriteBuffer(): WriteBuffer { @@ -145,9 +134,5 @@ export class ElectronWebContentChannel implements Channel { return writer; } - close(): void { - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); - } + } diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts index 1098aa1215771..acc65e5c6f38b 100644 --- a/packages/core/src/node/messaging/ipc-channel.ts +++ b/packages/core/src/node/messaging/ipc-channel.ts @@ -18,7 +18,7 @@ import * as cp from 'child_process'; import { Socket } from 'net'; import { Duplex } from 'stream'; -import { Channel, ChannelCloseEvent, Disposable, DisposableCollection, Emitter, Event, MessageProvider, WriteBuffer } from '../../common'; +import { AbstractChannel, Disposable, WriteBuffer } from '../../common'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; import { BinaryMessagePipe } from './binary-message-pipe'; @@ -27,29 +27,14 @@ import { BinaryMessagePipe } from './binary-message-pipe'; * This fd is opened as 5th channel in addition to the default stdios (stdin, stdout, stderr, ipc). This means the default channels * are not blocked and can be used by the respective process for additional custom message handling. */ -export class IPCChannel implements Channel { - - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } +export class IPCChannel extends AbstractChannel { protected messagePipe: BinaryMessagePipe; - protected toDispose = new DisposableCollection(); protected ipcErrorListener: (error: Error) => void = error => this.onErrorEmitter.fire(error); constructor(childProcess?: cp.ChildProcess) { + super(); if (childProcess) { this.setupChildProcess(childProcess); } else { @@ -58,7 +43,6 @@ export class IPCChannel implements Channel { this.messagePipe.onMessage(message => { this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message)); }); - this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); } protected setupChildProcess(childProcess: cp.ChildProcess): void { @@ -90,8 +74,4 @@ export class IPCChannel implements Channel { return result; } - close(): void { - this.toDispose.dispose(); - } - } diff --git a/yarn.lock b/yarn.lock index a05751759db62..c964d5217fe04 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1722,6 +1722,36 @@ dependencies: cross-spawn "^7.0.1" +"@msgpackr-extract/msgpackr-extract-darwin-arm64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.0.2.tgz#01e3669b8b2dc01f6353f2c87e1ec94faf52c587" + integrity sha512-FMX5i7a+ojIguHpWbzh5MCsCouJkwf4z4ejdUY/fsgB9Vkdak4ZnoIEskOyOUMMB4lctiZFGszFQJXUeFL8tRg== + +"@msgpackr-extract/msgpackr-extract-darwin-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-2.0.2.tgz#5ca32f16e6f1b7854001a1a2345b61d4e26a0931" + integrity sha512-DznYtF3lHuZDSRaIOYeif4JgO0NtO2Xf8DsngAugMx/bUdTFbg86jDTmkVJBNmV+cxszz6OjGvinnS8AbJ342g== + +"@msgpackr-extract/msgpackr-extract-linux-arm64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-2.0.2.tgz#ff629f94379981bf476dffb1439a7c1d3dba2d72" + integrity sha512-b0jMEo566YdM2K+BurSed7bswjo3a6bcdw5ETqoIfSuxKuRLPfAiOjVbZyZBgx3J/TAM/QrvEQ/VN89A0ZAxSg== + +"@msgpackr-extract/msgpackr-extract-linux-arm@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-2.0.2.tgz#5f6fd30d266c4a90cf989049c7f2e50e5d4fcd4c" + integrity sha512-Gy9+c3Wj+rUlD3YvCZTi92gs+cRX7ZQogtwq0IhRenloTTlsbpezNgk6OCkt59V4ATEWSic9rbU92H/l7XsRvA== + +"@msgpackr-extract/msgpackr-extract-linux-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-2.0.2.tgz#167faa553b9dbffac8b03bf27de9b6f846f0e1bc" + integrity sha512-zrBHaePwcv4cQXxzYgNj0+A8I1uVN97E7/3LmkRocYZ+rMwUsnPpp4RuTAHSRoKlTQV3nSdCQW4Qdt4MXw/iHw== + +"@msgpackr-extract/msgpackr-extract-win32-x64@2.0.2": + version "2.0.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-2.0.2.tgz#baea7764b1adf201ce4a792fe971fd7211dad2e4" + integrity sha512-fpnI00dt+yO1cKx9qBXelKhPBdEgvc8ZPav1+0r09j0woYQU2N79w/jcGawSY5UGlgQ3vjaJsFHnGbGvvqdLzg== + "@nodelib/fs.scandir@2.1.5": version "2.1.5" resolved "https://registry.yarnpkg.com/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz#7619c2eb21b25483f6d167548b4cfd5a7488c3d5" @@ -8050,6 +8080,27 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +msgpackr-extract@^2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-2.0.2.tgz#201a8d7ade47e99b3ba277c45736b00e195d4670" + integrity sha512-coskCeJG2KDny23zWeu+6tNy7BLnAiOGgiwzlgdm4oeSsTpqEJJPguHIuKZcCdB7tzhZbXNYSg6jZAXkZErkJA== + dependencies: + node-gyp-build-optional-packages "5.0.2" + optionalDependencies: + "@msgpackr-extract/msgpackr-extract-darwin-arm64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-darwin-x64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-arm" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-arm64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2" + "@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2" + +msgpackr@^1.6.1: + version "1.6.1" + resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.1.tgz#4f3c94d6a5b819b838ffc736eddaf60eba436d20" + integrity sha512-Je+xBEfdjtvA4bKaOv8iRhjC8qX2oJwpYH4f7JrG4uMVJVmnmkAT4pjKdbztKprGj3iwjcxPzb5umVZ02Qq3tA== + optionalDependencies: + msgpackr-extract "^2.0.2" + multer@1.4.4-lts.1: version "1.4.4-lts.1" resolved "https://registry.yarnpkg.com/multer/-/multer-1.4.4-lts.1.tgz#24100f701a4611211cfae94ae16ea39bb314e04d" @@ -8197,6 +8248,11 @@ node-fetch@^2.6.1, node-fetch@^2.6.7: dependencies: whatwg-url "^5.0.0" +node-gyp-build-optional-packages@5.0.2: + version "5.0.2" + resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.2.tgz#3de7d30bd1f9057b5dfbaeab4a4442b7fe9c5901" + integrity sha512-PiN4NWmlQPqvbEFcH/omQsswWQbe5Z9YK/zdB23irp5j2XibaA2IrGvpSWmVVG4qMZdmPdwPctSy4a86rOMn6g== + node-gyp-build@^4.2.1: version "4.5.0" resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.5.0.tgz#7a64eefa0b21112f89f58379da128ac177f20e40"