diff --git a/packages/core/src/browser/logger-frontend-module.ts b/packages/core/src/browser/logger-frontend-module.ts index dd1051a3e0e6d..4806d4ddec9dd 100644 --- a/packages/core/src/browser/logger-frontend-module.ts +++ b/packages/core/src/browser/logger-frontend-module.ts @@ -20,7 +20,7 @@ import { ILogger, Logger, LoggerFactory, setRootLogger, LoggerName, rootLoggerNa import { LoggerWatcher } from '../common/logger-watcher'; import { WebSocketConnectionProvider } from './messaging'; import { FrontendApplicationContribution } from './frontend-application'; -import { EncodingError } from '../common/message-rpc/rpc-message-encoder'; +import { EncodingError } from '../common/messaging/message-codec'; export const loggerFrontendModule = new ContainerModule(bind => { bind(FrontendApplicationContribution).toDynamicValue(ctx => ({ diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index b44e34841ee2c..af343ae1163a9 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -14,17 +14,17 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { injectable, interfaces, decorate, unmanaged } from 'inversify'; -import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, Channel } from '../../common'; -import { Endpoint } from '../endpoint'; -import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; +import { decorate, injectable, interfaces, unmanaged } from 'inversify'; import { io, Socket } from 'socket.io-client'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { Channel, Emitter, Event, JsonRpcProxy, JsonRpcProxyFactory } from '../../common'; +import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; +import { IWebSocket, WebSocketChannel, wsServicePath } from '../../common/messaging/web-socket-channel'; +import { Endpoint } from '../endpoint'; decorate(injectable(), JsonRpcProxyFactory); decorate(unmanaged(), JsonRpcProxyFactory, 0); -export interface WebSocketOptions { +export interface WebsocketOptions { /** * True by default. */ @@ -32,7 +32,7 @@ export interface WebSocketOptions { } @injectable() -export class WebSocketConnectionProvider extends AbstractConnectionProvider { +export class WebSocketConnectionProvider extends AbstractConnectionProvider { protected readonly onSocketDidOpenEmitter: Emitter = new Emitter(); get onSocketDidOpen(): Event { @@ -52,7 +52,7 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider { this.initializeMultiplexer(); @@ -81,14 +81,15 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider socket.connected, onClose: cb => socket.on('disconnect', reason => cb(reason)), onError: cb => socket.on('error', reason => cb(reason)), - onMessage: cb => socket.on('message', data => cb(data)), + onMessage: cb => socket.on('message', data => cb(new Uint8Array(data))), send: message => socket.emit('message', message) }; } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebsocketOptions): Promise { if (this.socket.connected) { - return super.openChannel(path, handler, options); + return super.openChannel(path, handler, options); } else { const openChannel = () => { this.socket.off('connect', openChannel); diff --git a/packages/core/src/common/index.ts b/packages/core/src/common/index.ts index cedfd254d0009..b78da80e6b6b8 100644 --- a/packages/core/src/common/index.ts +++ b/packages/core/src/common/index.ts @@ -29,7 +29,6 @@ export * from './contribution-provider'; export * from './path'; export * from './logger'; export * from './messaging'; -export * from './message-rpc'; export * from './message-service'; export * from './message-service-protocol'; export * from './progress-service'; diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts deleted file mode 100644 index 5d9fc6985cc07..0000000000000 --- a/packages/core/src/common/message-rpc/channel.ts +++ /dev/null @@ -1,260 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -import { Disposable, DisposableCollection } from '../disposable'; -import { Emitter, Event } from '../event'; -import { ReadBuffer, WriteBuffer } from './message-buffer'; - -/** - * A channel is a bidirectional communications channel with lifecycle and - * error signalling. Note that creation of channels is specific to particular - * implementations and thus not part of the protocol. - */ -export interface Channel { - - /** - * The remote side has closed the channel - */ - onClose: Event; - - /** - * An error has occurred while writing to or reading from the channel - */ - onError: Event; - - /** - * A message has arrived and can be read by listeners using a {@link MessageProvider}. - */ - onMessage: Event; - - /** - * Obtain a {@link WriteBuffer} to write a message to the channel. - */ - getWriteBuffer(): WriteBuffer; - - /** - * Close this channel. No {@link onClose} event should be sent - */ - close(): void; -} - -/** - * The event that is emitted when a channel is closed from the remote side. - */ -export interface ChannelCloseEvent { - reason: string, - code?: number -}; - -/** - * The `MessageProvider` is emitted when a channel receives a new message. - * Listeners can invoke the provider to obtain a new {@link ReadBuffer} for the received message. - * This ensures that each listener has its own isolated {@link ReadBuffer} instance. - * - */ -export type MessageProvider = () => ReadBuffer; - -/** - * Helper class to implement the single channels on a {@link ChannelMultiplexer}. Simply forwards write requests to - * the given write buffer source i.e. the main channel of the {@link ChannelMultiplexer}. - */ -export class ForwardingChannel implements Channel { - - protected toDispose = new DisposableCollection(); - constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { - this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); - } - - onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - }; - - onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - }; - - onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - }; - - getWriteBuffer(): WriteBuffer { - return this.writeBufferSource(); - } - - close(): void { - this.closeHandler(); - this.toDispose.dispose(); - } -} - -/** - * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} - */ -export enum MessageTypes { - Open = 1, - Close = 2, - AckOpen = 3, - Data = 4 -} - -/** - * The write buffers in this implementation immediately write to the underlying - * channel, so we rely on writers to the multiplexed channels to always commit their - * messages and always in one go. - */ -export class ChannelMultiplexer implements Disposable { - protected pendingOpen: Map void> = new Map(); - protected openChannels: Map = new Map(); - - protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>(); - get onDidOpenChannel(): Event<{ id: string, channel: Channel }> { - return this.onOpenChannelEmitter.event; - } - - protected toDispose = new DisposableCollection(); - - constructor(protected readonly underlyingChannel: Channel) { - this.toDispose.pushAll([ - this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())), - this.underlyingChannel.onClose(event => this.onUnderlyingChannelClose(event)), - this.underlyingChannel.onError(error => this.handleError(error)), - this.onOpenChannelEmitter - ]); - } - - protected handleError(error: unknown): void { - this.openChannels.forEach(channel => { - channel.onErrorEmitter.fire(error); - }); - } - - onUnderlyingChannelClose(event?: ChannelCloseEvent): void { - if (!this.toDispose.disposed) { - this.toDispose.push(Disposable.create(() => { - this.pendingOpen.clear(); - this.openChannels.forEach(channel => { - channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); - }); - - this.openChannels.clear(); - })); - this.dispose(); - } - - } - - protected handleMessage(buffer: ReadBuffer): void { - const type = buffer.readUint8(); - const id = buffer.readString(); - switch (type) { - case MessageTypes.AckOpen: { - return this.handleAckOpen(id); - } - case MessageTypes.Open: { - return this.handleOpen(id); - } - case MessageTypes.Close: { - return this.handleClose(id); - } - case MessageTypes.Data: { - return this.handleData(id, buffer.sliceAtReadPosition()); - } - } - } - - protected handleAckOpen(id: string): void { - // edge case: both side try to open a channel at the same time. - const resolve = this.pendingOpen.get(id); - if (resolve) { - const channel = this.createChannel(id); - this.pendingOpen.delete(id); - this.openChannels.set(id, channel); - resolve!(channel); - this.onOpenChannelEmitter.fire({ id, channel }); - } - } - - protected handleOpen(id: string): void { - if (!this.openChannels.has(id)) { - const channel = this.createChannel(id); - this.openChannels.set(id, channel); - const resolve = this.pendingOpen.get(id); - if (resolve) { - // edge case: both side try to open a channel at the same time. - resolve(channel); - } - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit(); - this.onOpenChannelEmitter.fire({ id, channel }); - } - } - - protected handleClose(id: string): void { - const channel = this.openChannels.get(id); - if (channel) { - channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' }); - this.openChannels.delete(id); - } - } - - protected handleData(id: string, data: ReadBuffer): void { - const channel = this.openChannels.get(id); - if (channel) { - channel.onMessageEmitter.fire(() => data); - } - } - - protected createChannel(id: string): ForwardingChannel { - return new ForwardingChannel(id, () => this.closeChannel(id), () => this.prepareWriteBuffer(id)); - } - - // Prepare the write buffer for the channel with the give, id. The channel id has to be encoded - // and written to the buffer before the actual message. - protected prepareWriteBuffer(id: string): WriteBuffer { - const underlying = this.underlyingChannel.getWriteBuffer(); - underlying.writeUint8(MessageTypes.Data); - underlying.writeString(id); - return underlying; - } - - protected closeChannel(id: string): void { - this.underlyingChannel.getWriteBuffer() - .writeUint8(MessageTypes.Close) - .writeString(id) - .commit(); - - this.openChannels.delete(id); - } - - open(id: string): Promise { - const result = new Promise((resolve, reject) => { - this.pendingOpen.set(id, resolve); - }); - this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit(); - return result; - } - - getOpenChannel(id: string): Channel | undefined { - return this.openChannels.get(id); - } - - dispose(): void { - this.toDispose.dispose(); - } -} - diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/common/message-rpc/index.ts deleted file mode 100644 index 671540a31c42b..0000000000000 --- a/packages/core/src/common/message-rpc/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2022 STMicroelectronics and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol'; -export { Channel, ChannelCloseEvent, MessageProvider } from './channel'; -export { ReadBuffer, WriteBuffer } from './message-buffer'; diff --git a/packages/core/src/common/message-rpc/message-buffer.ts b/packages/core/src/common/message-rpc/message-buffer.ts deleted file mode 100644 index 396ba95d93d73..0000000000000 --- a/packages/core/src/common/message-rpc/message-buffer.ts +++ /dev/null @@ -1,99 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -/** - * A buffer maintaining a write position capable of writing primitive values - */ -export interface WriteBuffer { - writeUint8(byte: number): this - writeUint16(value: number): this - writeUint32(value: number): this - writeString(value: string): this - writeBytes(value: Uint8Array): this - writeNumber(value: number): this - writeLength(value: number): this - /** - * Makes any writes to the buffer permanent, for example by sending the writes over a channel. - * You must obtain a new write buffer after committing - */ - commit(): void; -} - -export class ForwardingWriteBuffer implements WriteBuffer { - constructor(protected readonly underlying: WriteBuffer) { - } - - writeUint8(byte: number): this { - this.underlying.writeUint8(byte); - return this; - } - - writeUint16(value: number): this { - this.underlying.writeUint16(value); - return this; - } - - writeUint32(value: number): this { - this.underlying.writeUint32(value); - return this; - } - - writeLength(value: number): this { - this.underlying.writeLength(value); - return this; - } - - writeString(value: string): this { - this.underlying.writeString(value); - return this; - } - - writeBytes(value: Uint8Array): this { - this.underlying.writeBytes(value); - return this; - } - - writeNumber(value: number): this { - this.underlying.writeNumber(value); - return this; - } - - commit(): void { - this.underlying.commit(); - } -} - -/** - * A buffer maintaining a read position in a buffer containing a received message capable of - * reading primitive values. - */ -export interface ReadBuffer { - readUint8(): number; - readUint16(): number; - readUint32(): number; - readString(): string; - readNumber(): number, - readLength(): number, - readBytes(): Uint8Array; - - /** - * Returns a new read buffer whose starting read position is the current read position of this buffer. - * This is useful to create read buffers sub messages. - * (e.g. when using a multiplexer the beginning of the message might contain some protocol overhead which should not be part - * of the message reader that is sent to the target channel) - */ - sliceAtReadPosition(): ReadBuffer -} diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts deleted file mode 100644 index 174591f227833..0000000000000 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.spec.ts +++ /dev/null @@ -1,55 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** - -import { expect } from 'chai'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; -import { EncodingError, RpcMessageDecoder, RpcMessageEncoder } from './rpc-message-encoder'; - -describe('PPC Message Codex', () => { - describe('RPC Message Encoder & Decoder', () => { - it('should encode object into binary message and decode the message back into the original object', () => { - const buffer = new Uint8Array(1024); - const writer = new Uint8ArrayWriteBuffer(buffer); - - const encoder = new RpcMessageEncoder(); - const jsonMangled = JSON.parse(JSON.stringify(encoder)); - // The RpcMessageEncoder can decode/encode collections, whereas JSON.parse can't. => We have to manually restore the set - // eslint-disable-next-line @typescript-eslint/no-explicit-any - jsonMangled.registeredTags = (encoder as any).registeredTags; - - encoder.writeTypedValue(writer, encoder, new WeakSet()); - - const written = writer.getCurrentContents(); - - const reader = new Uint8ArrayReadBuffer(written); - - const decoder = new RpcMessageDecoder(); - const decoded = decoder.readTypedValue(reader); - - expect(decoded).deep.equal(jsonMangled); - }); - it('should fail with an EncodingError when trying to encode the object ', () => { - const x = new Set(); - const y = new Set(); - x.add(y); - y.add(x); - const encoder = new RpcMessageEncoder(); - - const writer = new Uint8ArrayWriteBuffer(); - expect(() => encoder.writeTypedValue(writer, x, new WeakSet())).to.throw(EncodingError); - }); - }); -}); diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts deleted file mode 100644 index 202507621db09..0000000000000 --- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts +++ /dev/null @@ -1,552 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2022 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -/* eslint-disable @typescript-eslint/no-explicit-any */ - -import { ReadBuffer, WriteBuffer } from './message-buffer'; - -/** - * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) - * into a channel write buffer and decode the same messages from a read buffer. - * Custom encoders/decoders can be registered to specially handling certain types of values - * to be encoded. Clients are responsible for ensuring that the set of tags for encoders - * is distinct and the same at both ends of a channel. - */ - -export type RpcMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage; - -export const enum RpcMessageType { - Request = 1, - Notification = 2, - Reply = 3, - ReplyErr = 4, - Cancel = 5, -} - -export interface CancelMessage { - type: RpcMessageType.Cancel; - id: number; -} - -export interface RequestMessage { - type: RpcMessageType.Request; - id: number; - method: string; - args: any[]; -} - -export interface NotificationMessage { - type: RpcMessageType.Notification; - id: number; - method: string; - args: any[]; -} - -export interface ReplyMessage { - type: RpcMessageType.Reply; - id: number; - res: any; -} - -export interface ReplyErrMessage { - type: RpcMessageType.ReplyErr; - id: number; - err: any; -} - -export interface SerializedError { - readonly $isError: true; - readonly name: string; - readonly message: string; - readonly stack: string; -} - -/** - * A special error that can be returned in case a request - * has failed. Provides additional information i.e. an error code - * and additional error data. - */ -export class ResponseError extends Error { - constructor(readonly code: number, message: string, readonly data: any) { - super(message); - } -} - -/** - * The tag values for the default {@link ValueEncoder}s & {@link ValueDecoder}s - */ - -export enum ObjectType { - JSON = 0, - ByteArray = 1, - ObjectArray = 2, - Undefined = 3, - Object = 4, - String = 5, - Boolean = 6, - Number = 7, - // eslint-disable-next-line @typescript-eslint/no-shadow - ResponseError = 8, - Error = 9, - Map = 10, - Set = 11, - Function = 12 - -} - -/** - * A value encoder writes javascript values to a write buffer. Encoders will be asked - * in turn (ordered by their tag value, descending) whether they can encode a given value - * This means encoders with higher tag values have priority. Since the default encoders - * have tag values from 1-12, they can be easily overridden. - */ -export interface ValueEncoder { - /** - * Returns true if this encoder wants to encode this value. - * @param value the value to be encoded - */ - is(value: any): boolean; - /** - * Write the given value to the buffer. Will only be called if {@link is(value)} returns true. - * @param buf The buffer to write to - * @param value The value to be written - * @param visitedObjects The collection of already visited (i.e. encoded) objects. Used to detect circular references - * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} - * to write a value to the underlying buffer. This is used mostly to write structures like an array - * without having to know how to encode the values in the array - */ - write(buf: WriteBuffer, value: any, visitedObjects: WeakSet, recursiveEncode?: (buf: WriteBuffer, value: any, visitedObjects: WeakSet) => void): void; -} - -/** - * Reads javascript values from a read buffer - */ -export interface ValueDecoder { - /** - * Reads a value from a read buffer. This method will be called for the decoder that is - * registered for the tag associated with the value encoder that encoded this value. - * @param buf The read buffer to read from - * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder} - * to read values from the underlying read buffer. This is used mostly to decode structures like an array - * without having to know how to decode the values in the array. - */ - read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; -} - -/** - * Custom error thrown by the {@link RpcMessageEncoder} if an error occurred during the encoding and the - * object could not be written to the given {@link WriteBuffer} - */ -export class EncodingError extends Error { - constructor(msg: string) { - super(msg); - } -} - -/** - * A `RpcMessageDecoder` parses a a binary message received via {@link ReadBuffer} into a {@link RpcMessage} - */ -export class RpcMessageDecoder { - - protected decoders: Map = new Map(); - - constructor() { - this.registerDecoders(); - } - - registerDecoders(): void { - this.registerDecoder(ObjectType.JSON, { - read: buf => { - const json = buf.readString(); - return JSON.parse(json); - } - }); - this.registerDecoder(ObjectType.Error, { - read: buf => { - const serializedError: SerializedError = JSON.parse(buf.readString()); - const error = new Error(serializedError.message); - Object.assign(error, serializedError); - return error; - } - }); - - this.registerDecoder(ObjectType.ResponseError, { - read: buf => { - const error = JSON.parse(buf.readString()); - return new ResponseError(error.code, error.message, error.data); - } - }); - this.registerDecoder(ObjectType.ByteArray, { - read: buf => buf.readBytes() - }); - - this.registerDecoder(ObjectType.ObjectArray, { - read: buf => this.readArray(buf) - }); - - this.registerDecoder(ObjectType.Undefined, { - read: () => undefined - }); - - this.registerDecoder(ObjectType.Object, { - read: (buf, recursiveRead) => { - const propertyCount = buf.readLength(); - const result = Object.create({}); - for (let i = 0; i < propertyCount; i++) { - const key = buf.readString(); - const value = recursiveRead(buf); - result[key] = value; - } - return result; - } - }); - - this.registerDecoder(ObjectType.String, { - read: (buf, recursiveRead) => buf.readString() - }); - - this.registerDecoder(ObjectType.Boolean, { - read: buf => buf.readUint8() === 1 - }); - - this.registerDecoder(ObjectType.Number, { - read: buf => buf.readNumber() - }); - - this.registerDecoder(ObjectType.Map, { - read: buf => new Map(this.readArray(buf)) - }); - - this.registerDecoder(ObjectType.Set, { - read: buf => new Set(this.readArray(buf)) - }); - - this.registerDecoder(ObjectType.Function, { - read: () => ({}) - }); - - } - - /** - * Registers a new {@link ValueDecoder} for the given tag. - * After the successful registration the {@link tagIntType} is recomputed - * by retrieving the highest tag value and calculating the required Uint size to store it. - * @param tag the tag for which the decoder should be registered. - * @param decoder the decoder that should be registered. - */ - registerDecoder(tag: number, decoder: ValueDecoder): void { - if (this.decoders.has(tag)) { - throw new Error(`Decoder already registered: ${tag}`); - } - this.decoders.set(tag, decoder); - } - parse(buf: ReadBuffer): RpcMessage { - try { - const msgType = buf.readUint8(); - - switch (msgType) { - case RpcMessageType.Request: - return this.parseRequest(buf); - case RpcMessageType.Notification: - return this.parseNotification(buf); - case RpcMessageType.Reply: - return this.parseReply(buf); - case RpcMessageType.ReplyErr: - return this.parseReplyErr(buf); - case RpcMessageType.Cancel: - return this.parseCancel(buf); - } - throw new Error(`Unknown message type: ${msgType}`); - } catch (e) { - // exception does not show problematic content: log it! - console.log('failed to parse message: ' + buf); - throw e; - } - } - - protected parseCancel(msg: ReadBuffer): CancelMessage { - const callId = msg.readUint32(); - return { - type: RpcMessageType.Cancel, - id: callId - }; - } - - protected parseRequest(msg: ReadBuffer): RequestMessage { - const callId = msg.readUint32(); - const method = msg.readString(); - const args = this.readArray(msg); - - return { - type: RpcMessageType.Request, - id: callId, - method: method, - args: args - }; - } - - protected parseNotification(msg: ReadBuffer): NotificationMessage { - const callId = msg.readUint32(); - const method = msg.readString(); - const args = this.readArray(msg); - - return { - type: RpcMessageType.Notification, - id: callId, - method: method, - args: args - }; - } - - parseReply(msg: ReadBuffer): ReplyMessage { - const callId = msg.readUint32(); - const value = this.readTypedValue(msg); - return { - type: RpcMessageType.Reply, - id: callId, - res: value - }; - } - - parseReplyErr(msg: ReadBuffer): ReplyErrMessage { - const callId = msg.readUint32(); - - const err = this.readTypedValue(msg); - - return { - type: RpcMessageType.ReplyErr, - id: callId, - err - }; - } - - readArray(buf: ReadBuffer): any[] { - const length = buf.readLength(); - const result = new Array(length); - for (let i = 0; i < length; i++) { - result[i] = this.readTypedValue(buf); - } - return result; - } - - readTypedValue(buf: ReadBuffer): any { - const type = buf.readUint8(); - const decoder = this.decoders.get(type); - if (!decoder) { - throw new Error(`No decoder for tag ${type}`); - } - return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); - } -} - -/** - * A `RpcMessageEncoder` writes {@link RpcMessage} objects to a {@link WriteBuffer}. Note that it is - * up to clients to commit the message. This allows for multiple messages being - * encoded before sending. - */ -export class RpcMessageEncoder { - - protected readonly encoders: [number, ValueEncoder][] = []; - protected readonly registeredTags: Set = new Set(); - - constructor() { - this.registerEncoders(); - } - - protected registerEncoders(): void { - // encoders will be consulted in reverse order of registration, so the JSON fallback needs to be last - this.registerEncoder(ObjectType.JSON, { - is: () => true, - write: (buf, value) => { - buf.writeString(JSON.stringify(value)); - } - }); - - this.registerEncoder(ObjectType.Function, { - is: value => typeof value === 'function', - write: () => { } - }); - - this.registerEncoder(ObjectType.Object, { - is: value => typeof value === 'object', - write: (buf, object, visitedObjects, recursiveEncode) => { - const properties = Object.keys(object); - const relevant = []; - for (const property of properties) { - const value = object[property]; - if (typeof value !== 'function') { - relevant.push([property, value]); - } - } - - buf.writeLength(relevant.length); - for (const [property, value] of relevant) { - buf.writeString(property); - recursiveEncode?.(buf, value, visitedObjects); - } - } - }); - - this.registerEncoder(ObjectType.Error, { - is: value => value instanceof Error, - write: (buf, error: Error) => { - const { name, message } = error; - const stack: string = (error).stacktrace || error.stack; - const serializedError = { - $isError: true, - name, - message, - stack - }; - buf.writeString(JSON.stringify(serializedError)); - } - }); - - this.registerEncoder(ObjectType.ResponseError, { - is: value => value instanceof ResponseError, - write: (buf, value) => buf.writeString(JSON.stringify(value)) - }); - - this.registerEncoder(ObjectType.Map, { - is: value => value instanceof Map, - write: (buf, value: Map, visitedObjects) => this.writeArray(buf, Array.from(value.entries()), visitedObjects) - }); - - this.registerEncoder(ObjectType.Set, { - is: value => value instanceof Set, - write: (buf, value: Set, visitedObjects) => this.writeArray(buf, [...value], visitedObjects) - }); - - this.registerEncoder(ObjectType.Undefined, { - // eslint-disable-next-line no-null/no-null - is: value => value == null, - write: () => { } - }); - - this.registerEncoder(ObjectType.ObjectArray, { - is: value => Array.isArray(value), - write: (buf, value, visitedObjects) => { - this.writeArray(buf, value, visitedObjects); - } - }); - - this.registerEncoder(ObjectType.ByteArray, { - is: value => value instanceof Uint8Array, - write: (buf, value) => { - buf.writeBytes(value); - } - }); - - this.registerEncoder(ObjectType.String, { - is: value => typeof value === 'string', - write: (buf, value) => { - buf.writeString(value); - } - }); - - this.registerEncoder(ObjectType.Boolean, { - is: value => typeof value === 'boolean', - write: (buf, value) => { - buf.writeUint8(value === true ? 1 : 0); - } - }); - - this.registerEncoder(ObjectType.Number, { - is: value => typeof value === 'number', - write: (buf, value) => { - buf.writeNumber(value); - } - }); - } - - /** - * Registers a new {@link ValueEncoder} for the given tag. - * After the successful registration the {@link tagIntType} is recomputed - * by retrieving the highest tag value and calculating the required Uint size to store it. - * @param tag the tag for which the encoder should be registered. - * @param decoder the encoder that should be registered. - */ - registerEncoder(tag: number, encoder: ValueEncoder): void { - if (this.registeredTags.has(tag)) { - throw new Error(`Tag already registered: ${tag}`); - } - this.registeredTags.add(tag); - this.encoders.push([tag, encoder]); - } - - cancel(buf: WriteBuffer, requestId: number): void { - buf.writeUint8(RpcMessageType.Cancel); - buf.writeUint32(requestId); - } - - notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Notification); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); - } - - request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void { - buf.writeUint8(RpcMessageType.Request); - buf.writeUint32(requestId); - buf.writeString(method); - this.writeArray(buf, args, new WeakSet()); - } - - replyOK(buf: WriteBuffer, requestId: number, res: any): void { - buf.writeUint8(RpcMessageType.Reply); - buf.writeUint32(requestId); - this.writeTypedValue(buf, res, new WeakSet()); - } - - replyErr(buf: WriteBuffer, requestId: number, err: any): void { - buf.writeUint8(RpcMessageType.ReplyErr); - buf.writeUint32(requestId); - this.writeTypedValue(buf, err, new WeakSet()); - } - - writeTypedValue(buf: WriteBuffer, value: any, visitedObjects: WeakSet): void { - if (value && typeof value === 'object') { - if (visitedObjects.has(value)) { - throw new EncodingError('Object to encode contains circular references!'); - } - visitedObjects.add(value); - } - - try { - for (let i: number = this.encoders.length - 1; i >= 0; i--) { - if (this.encoders[i][1].is(value)) { - buf.writeUint8(this.encoders[i][0]); - this.encoders[i][1].write(buf, value, visitedObjects, (innerBuffer, innerValue, _visitedObjects) => { - this.writeTypedValue(innerBuffer, innerValue, _visitedObjects); - }); - return; - } - } - throw new EncodingError(`No suitable value encoder found for ${value}`); - } finally { - if (value && typeof value === 'object') { - visitedObjects.delete(value); - } - } - } - - writeArray(buf: WriteBuffer, value: any[], visitedObjects: WeakSet): void { - buf.writeLength(value.length); - for (let i = 0; i < value.length; i++) { - this.writeTypedValue(buf, value[i], visitedObjects); - } - } -} diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts b/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts deleted file mode 100644 index 59cccbf90a605..0000000000000 --- a/packages/core/src/common/message-rpc/uint8-array-message-buffer.spec.ts +++ /dev/null @@ -1,41 +0,0 @@ -// ***************************************************************************** -// Copyright (C) 2021 Red Hat, Inc. and others. -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License v. 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0. -// -// This Source Code may also be made available under the following Secondary -// Licenses when the conditions for such availability set forth in the Eclipse -// Public License v. 2.0 are satisfied: GNU General Public License, version 2 -// with the GNU Classpath Exception which is available at -// https://www.gnu.org/software/classpath/license.html. -// -// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 -// ***************************************************************************** -import { expect } from 'chai'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; - -describe('array message buffer tests', () => { - it('basic read write test', () => { - const buffer = new Uint8Array(1024); - const writer = new Uint8ArrayWriteBuffer(buffer); - - writer.writeUint8(8); - writer.writeUint32(10000); - writer.writeBytes(new Uint8Array([1, 2, 3, 4])); - writer.writeString('this is a string'); - writer.writeString('another string'); - writer.commit(); - - const written = writer.getCurrentContents(); - - const reader = new Uint8ArrayReadBuffer(written); - - expect(reader.readUint8()).equal(8); - expect(reader.readUint32()).equal(10000); - expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4])); - expect(reader.readString()).equal('this is a string'); - expect(reader.readString()).equal('another string'); - }); -}); diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts index 64f29bf185178..7a1dab2df771a 100644 --- a/packages/core/src/common/messaging/abstract-connection-provider.ts +++ b/packages/core/src/common/messaging/abstract-connection-provider.ts @@ -18,7 +18,8 @@ import { injectable, interfaces } from 'inversify'; import { Emitter, Event } from '../event'; import { ConnectionHandler } from './handler'; import { JsonRpcProxy, JsonRpcProxyFactory } from './proxy-factory'; -import { Channel, ChannelMultiplexer } from '../message-rpc/channel'; +import { ChannelMultiplexer } from './channel-multiplexer'; +import { Channel } from './channel'; /** * Factor common logic according to `ElectronIpcConnectionProvider` and @@ -92,11 +93,11 @@ export abstract class AbstractConnectionProvider }, options); } - async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { + async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { if (!this.channelMultiPlexer) { throw new Error('The channel multiplexer has not been initialized yet!'); } - const newChannel = await this.channelMultiPlexer.open(path); + const newChannel = await this.channelMultiPlexer.openChannel(path); newChannel.onClose(() => { const { reconnecting } = { reconnecting: true, ...options }; if (reconnecting) { diff --git a/packages/core/src/common/message-rpc/channel.spec.ts b/packages/core/src/common/messaging/channel-multiplexer.spec.ts similarity index 60% rename from packages/core/src/common/message-rpc/channel.spec.ts rename to packages/core/src/common/messaging/channel-multiplexer.spec.ts index 43579ec957c6c..c70c9d6b29333 100644 --- a/packages/core/src/common/message-rpc/channel.spec.ts +++ b/packages/core/src/common/messaging/channel-multiplexer.spec.ts @@ -13,11 +13,11 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ import { assert, expect, spy, use } from 'chai'; import * as spies from 'chai-spies'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; -import { ChannelMultiplexer, ForwardingChannel, MessageProvider } from './channel'; +import { ChannelMultiplexer, SubChannel, } from './channel-multiplexer'; use(spies); @@ -25,20 +25,12 @@ use(spies); * A pipe with two channels at each end for testing. */ export class ChannelPipe { - readonly left: ForwardingChannel = new ForwardingChannel('left', () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' }), () => { - const leftWrite = new Uint8ArrayWriteBuffer(); - leftWrite.onCommit(buffer => { - this.right.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); - }); - return leftWrite; - }); - readonly right: ForwardingChannel = new ForwardingChannel('right', () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' }), () => { - const rightWrite = new Uint8ArrayWriteBuffer(); - rightWrite.onCommit(buffer => { - this.left.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(buffer)); - }); - return rightWrite; - }); + readonly left: SubChannel = new SubChannel('left', msg => { + this.right.onMessageEmitter.fire(msg); + }, + () => this.right.onCloseEmitter.fire({ reason: 'Left channel has been closed' })); + readonly right: SubChannel = new SubChannel('right', msg => this.left.onMessageEmitter.fire(msg), + () => this.left.onCloseEmitter.fire({ reason: 'Right channel has been closed' })); } describe('Message Channel', () => { describe('Channel multiplexer', () => { @@ -53,8 +45,8 @@ describe('Message Channel', () => { rightMultiplexer.onDidOpenChannel(openChannelSpy); leftMultiplexer.onDidOpenChannel(openChannelSpy); - const leftFirst = await leftMultiplexer.open('first'); - const leftSecond = await leftMultiplexer.open('second'); + const leftFirst = await leftMultiplexer.openChannel('first'); + const leftSecond = await leftMultiplexer.openChannel('second'); const rightFirst = rightMultiplexer.getOpenChannel('first'); const rightSecond = rightMultiplexer.getOpenChannel('second'); @@ -62,22 +54,20 @@ describe('Message Channel', () => { assert.isNotNull(rightFirst); assert.isNotNull(rightSecond); - const leftSecondSpy = spy((buf: MessageProvider) => { - const message = buf().readString(); + const leftSecondSpy = spy((message: any) => { expect(message).equal('message for second'); }); leftSecond.onMessage(leftSecondSpy); - const rightFirstSpy = spy((buf: MessageProvider) => { - const message = buf().readString(); + const rightFirstSpy = spy((message: any) => { expect(message).equal('message for first'); }); rightFirst!.onMessage(rightFirstSpy); - leftFirst.getWriteBuffer().writeString('message for first').commit(); - rightSecond!.getWriteBuffer().writeString('message for second').commit(); + leftFirst.send('message for first'); + rightSecond!.send('message for second'); expect(leftSecondSpy).to.be.called(); expect(rightFirstSpy).to.be.called(); diff --git a/packages/core/src/common/messaging/channel-multiplexer.ts b/packages/core/src/common/messaging/channel-multiplexer.ts new file mode 100644 index 0000000000000..2df8be3955b58 --- /dev/null +++ b/packages/core/src/common/messaging/channel-multiplexer.ts @@ -0,0 +1,198 @@ +// ***************************************************************************** +// Copyright (C) 2022 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { Disposable, DisposableCollection } from '../disposable'; +import { Emitter, Event } from '../event'; +import { AbstractChannel, Channel, ChannelCloseEvent } from './channel'; + +/** + * The different message types used in the messaging protocol of the {@link ChannelMultiplexer} + */ +export enum MultiplexerMessageType { + Open = 1, + Close = 2, + AckOpen = 3, + Data = 4 +} + +export interface MultiplexerMessage { + type: MultiplexerMessageType, + channelId: string, +} + +export namespace MultiplexerMessage { + export function is(object: any): object is MultiplexerMessage { + return typeof object?.type === 'number' && MultiplexerMessageType[object.type] !== undefined; + } +} + +export interface DataMessage extends MultiplexerMessage { + type: MultiplexerMessageType.Data + data: any +} + +/** + * Internal Helper class to implement the sub channels on a {@link ChannelMultiplexer}. All subchannels of an {@link ChannelMultiplexer} are reusing the same main channel for + * sending and receiving messages. + * The visibility of event emitters is set to public. This enables access from the container class (i.e. the {@link ChannelMultiplexer}). + */ +export class SubChannel extends AbstractChannel { + override readonly onCloseEmitter: Emitter; + override readonly onMessageEmitter: Emitter; + override readonly onErrorEmitter: Emitter; + + constructor(readonly id: string, public send: (message: any) => void, public override close: () => void) { + super(); + } +} + +export class ChannelMultiplexer implements Disposable { + + protected toDispose = new DisposableCollection(); + protected pendingOpen: Map void> = new Map(); + protected openChannels: Map = new Map(); + + protected readonly onOpenChannelEmitter = new Emitter<{ id: string, channel: Channel }>(); + get onDidOpenChannel(): Event<{ id: string, channel: Channel }> { + return this.onOpenChannelEmitter.event; + } + + constructor(protected readonly mainChannel: Channel) { + this.toDispose.pushAll([ + this.mainChannel.onMessage(msg => this.handleMainChannelMessage(msg)), + this.mainChannel.onClose(event => this.handleMainChannelClose(event)), + this.mainChannel.onError(error => this.handleError(error)), + this.onOpenChannelEmitter + ]); + } + + handleMainChannelClose(event: ChannelCloseEvent): void { + if (!this.toDispose.disposed) { + this.toDispose.push(Disposable.create(() => { + this.pendingOpen.clear(); + this.openChannels.forEach(channel => { + channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); + }); + + this.openChannels.clear(); + })); + this.dispose(); + } + } + + protected handleError(error: unknown): void { + this.openChannels.forEach(channel => { + channel.onErrorEmitter.fire(error); + }); + } + + protected handleMainChannelMessage(message: MultiplexerMessage): void { + switch (message.type) { + case MultiplexerMessageType.AckOpen: + return this.handleAckOpen(message.channelId); + case MultiplexerMessageType.Open: + return this.handleOpen(message.channelId); + case MultiplexerMessageType.Close: + return this.handleClose(message.channelId); + case MultiplexerMessageType.Data: { + const { channelId, data } = message as DataMessage; + return this.handleData(channelId, data); + } + } + } + + protected handleAckOpen(channelId: string): void { + // edge case: both side try to open a channel at the same time. + const resolve = this.pendingOpen.get(channelId); + if (resolve) { + const channel = this.createChannel(channelId); + this.pendingOpen.delete(channelId); + this.openChannels.set(channelId, channel); + resolve(channel); + this.onOpenChannelEmitter.fire({ id: channelId, channel }); + } + } + + protected handleOpen(channelId: string): void { + if (!this.openChannels.has(channelId)) { + const channel = this.createChannel(channelId); + this.openChannels.set(channelId, channel); + const resolve = this.pendingOpen.get(channelId); + if (resolve) { + // edge case: both side try to open a channel at the same time. + resolve(channel); + } else { + this.sendMessage({ channelId, type: MultiplexerMessageType.AckOpen }); + } + + this.onOpenChannelEmitter.fire({ id: channelId, channel }); + } + } + + protected handleClose(id: string): void { + const channel = this.openChannels.get(id); + if (channel) { + channel.onCloseEmitter.fire({ reason: 'Channel has been closed from the remote side' }); + this.openChannels.delete(id); + } + } + + protected handleData(channelId: string, data: any): void { + const channel = this.openChannels.get(channelId); + if (channel) { + channel.onMessageEmitter.fire(data); + } + } + + protected createChannel(channelId: string): SubChannel { + return new SubChannel(channelId, + data => this.sendMessage({ channelId: channelId, data, type: MultiplexerMessageType.Data }), + () => this.closeChannel(channelId)); + } + + protected sendMessage(message: MultiplexerMessage): void { + this.mainChannel.send(message); + } + + protected closeChannel(channelId: string): void { + if (this.openChannels.has(channelId)) { + this.sendMessage({ channelId: channelId, type: MultiplexerMessageType.Open }); + this.openChannels.delete(channelId); + } + } + + openChannel(channelId: string): Promise { + const existingChannel = this.openChannels.get(channelId); + if (existingChannel) { + return Promise.resolve(existingChannel); + } + const result = new Promise((resolve, reject) => { + this.pendingOpen.set(channelId, resolve); + }); + this.sendMessage({ channelId, type: MultiplexerMessageType.Open }); + return result; + } + + getOpenChannel(id: string): Channel | undefined { + return this.openChannels.get(id); + } + + dispose(): void { + this.toDispose.dispose(); + } +} + diff --git a/packages/core/src/common/messaging/channel.ts b/packages/core/src/common/messaging/channel.ts new file mode 100644 index 0000000000000..051f9279caa86 --- /dev/null +++ b/packages/core/src/common/messaging/channel.ts @@ -0,0 +1,100 @@ +// ***************************************************************************** +// Copyright (C) 2021 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { Disposable, DisposableCollection } from '../disposable'; +import { Emitter, Event } from '../event'; + +export interface ChannelCloseEvent { + reason: string, + code?: number +}; + +/** + * A channel is a bidirectional transport channel to send and receive Javascript objects with lifecycle and + * error signalling. + */ +export interface Channel { + + /** + * The remote side has closed the channel + */ + onClose: Event; + + /** + * An error has occurred while writing to or reading from the channel + */ + onError: Event; + + /** + * A message has arrived and can be read by listeners. + */ + onMessage: Event; + + /** + * Send a message over to the channel. + */ + send(message: T): void + + /** + * Close this channel. No {@link onClose} event should be sent + */ + close(): void; +} + +/** + * Common {@link Channel} base implementation that takes care of setup and proper + * disposal of the event emitters. + */ +export abstract class AbstractChannel implements Channel, Disposable { + + protected toDispose = new DisposableCollection(); + + protected onCloseEmitter = new Emitter(); + get onClose(): Event { + return this.onCloseEmitter.event; + } + + protected onErrorEmitter = new Emitter(); + get onError(): Event { + return this.onErrorEmitter.event; + } + + protected onMessageEmitter = new Emitter(); + get onMessage(): Event { + return this.onMessageEmitter.event; + } + + constructor() { + this.toDispose.pushAll([ + this.onCloseEmitter, + this.onErrorEmitter, + this.onMessageEmitter, + Disposable.create(() => this.close()) + ]); + } + + dispose(): void { + this.toDispose.dispose(); + } + + close(): void { + this.dispose(); + } + + abstract send(message: T): void; + +} diff --git a/packages/core/src/common/messaging/handler.ts b/packages/core/src/common/messaging/handler.ts index 1e790d38aeec3..0995ee91aa20e 100644 --- a/packages/core/src/common/messaging/handler.ts +++ b/packages/core/src/common/messaging/handler.ts @@ -14,7 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../message-rpc/channel'; +import { Channel } from './channel'; export const ConnectionHandler = Symbol('ConnectionHandler'); diff --git a/packages/core/src/common/messaging/index.ts b/packages/core/src/common/messaging/index.ts index 34b87063db376..37aa4cfc5c945 100644 --- a/packages/core/src/common/messaging/index.ts +++ b/packages/core/src/common/messaging/index.ts @@ -17,3 +17,7 @@ export * from './handler'; export * from './proxy-factory'; export * from './connection-error-handler'; +export * from './channel'; +export { MessageCodec } from './message-codec'; +export { ChannelMultiplexer } from './channel-multiplexer'; +export { RpcProtocol, RequestHandler } from './rpc-protocol'; diff --git a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts b/packages/core/src/common/messaging/message-buffer.ts similarity index 79% rename from packages/core/src/common/message-rpc/uint8-array-message-buffer.ts rename to packages/core/src/common/messaging/message-buffer.ts index feec31dcd69cf..1cb7f059a0a69 100644 --- a/packages/core/src/common/message-rpc/uint8-array-message-buffer.ts +++ b/packages/core/src/common/messaging/message-buffer.ts @@ -13,22 +13,43 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Disposable } from '../disposable'; -import { Emitter, Event } from '../event'; -import { ReadBuffer, WriteBuffer } from './message-buffer'; + +/** + * A buffer maintaining a write position capable of writing primitive values + */ +export interface WriteBuffer { + writeUint8(byte: number): this + writeUint16(value: number): this + writeUint32(value: number): this + writeString(value: string): this + writeBytes(value: Uint8Array): this + writeNumber(value: number): this + writeLength(value: number): this + getCurrentContents(): Uint8Array; + +} + +/** + * A buffer maintaining a read position in a buffer containing a received message capable of + * reading primitive values. + */ +export interface ReadBuffer { + readUint8(): number; + readUint16(): number; + readUint32(): number; + readString(): string; + readNumber(): number, + readLength(): number, + readBytes(): Uint8Array; +} /** * The default {@link WriteBuffer} implementation. Uses a {@link Uint8Array} for buffering. - * The {@link Uint8ArrayWriteBuffer.onCommit} hook can be used to rect to on-commit events. - * After the {@link Uint8ArrayWriteBuffer.commit} method has been called the buffer is disposed - * and can no longer be used for writing data. If the writer buffer is no longer needed but the message - * has not been committed yet it has to be disposed manually. */ -export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { +export class WriteBufferImpl implements WriteBuffer { private encoder = new TextEncoder(); private msg: DataView; - private isDisposed = false; private offset: number; constructor(private buffer: Uint8Array = new Uint8Array(1024), writePosition: number = 0) { @@ -106,36 +127,16 @@ export class Uint8ArrayWriteBuffer implements WriteBuffer, Disposable { return this; } - private onCommitEmitter = new Emitter(); - get onCommit(): Event { - return this.onCommitEmitter.event; - } - - commit(): void { - if (this.isDisposed) { - throw new Error("Could not invoke 'commit'. The WriteBuffer is already disposed."); - } - this.onCommitEmitter.fire(this.getCurrentContents()); - this.dispose(); - } - getCurrentContents(): Uint8Array { return this.buffer.slice(this.buffer.byteOffset, this.offset); } - - dispose(): void { - if (!this.isDisposed) { - this.onCommitEmitter.dispose(); - this.isDisposed = true; - } - } - } + /** * The default {@link ReadBuffer} implementation. Uses a {@link Uint8Array} for buffering. * Is for single message read. A message can only be read once. */ -export class Uint8ArrayReadBuffer implements ReadBuffer { +export class ReadBufferImpl implements ReadBuffer { private offset: number = 0; private msg: DataView; private decoder = new TextDecoder(); @@ -198,9 +199,4 @@ export class Uint8ArrayReadBuffer implements ReadBuffer { this.offset += length; return result; } - - sliceAtReadPosition(): ReadBuffer { - const sliceOffset = this.offset - this.buffer.byteOffset; - return new Uint8ArrayReadBuffer(this.buffer, sliceOffset); - } } diff --git a/packages/core/src/common/messaging/message-codec.spec.ts b/packages/core/src/common/messaging/message-codec.spec.ts new file mode 100644 index 0000000000000..5357ff983211b --- /dev/null +++ b/packages/core/src/common/messaging/message-codec.spec.ts @@ -0,0 +1,60 @@ +// ***************************************************************************** +// Copyright (C) STMicroelectronics and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** + +import { expect } from 'chai'; +import { BinaryMessageCodec, EncodingError } from './message-codec'; + +describe('Binary Message Codec', () => { + describe('Encode & Decode', () => { + it('should encode object into binary message and decode the message back into the original object', () => { + const messageCodec = new BinaryMessageCodec(); + // Construct a simple test object that covers all different value types + const testObject = { + string: 'string', + boolean: true, + integer: 5, + float: 14.5, + array: ['1', 2, { three: 'three' }], + set: new Set([1, 2, 3]), + map: new Map([[1, 1], [2, 2], [3, 3]]), + buffer: new TextEncoder().encode('ThisIsAUint8Array'), + object: { foo: 'bar', baz: true }, + functionArray: [() => console.log()], + undefined: undefined, + // eslint-disable-next-line no-null/no-null + null: null + }; + + // Prepare the expected result. Functions array values should be encoded as empty objects, + // null as `undefined`. + const expected = { ...testObject, functionArray: [{}], null: undefined }; + const encoded = messageCodec.encode(testObject); + + const decoded = messageCodec.decode(encoded); + + expect(decoded).deep.equal(expected); + }); + it('should fail with an EncodingError when trying to encode the circular object structure', () => { + const x = new Set(); + const y = new Set(); + x.add(y); + y.add(x); + const codec = new BinaryMessageCodec(); + + expect(() => codec.encode(x)).to.throw(EncodingError); + }); + }); +}); diff --git a/packages/core/src/common/messaging/message-codec.ts b/packages/core/src/common/messaging/message-codec.ts new file mode 100644 index 0000000000000..040e51e3d27d9 --- /dev/null +++ b/packages/core/src/common/messaging/message-codec.ts @@ -0,0 +1,338 @@ +// ***************************************************************************** +// Copyright (C) 2022 Red Hat, Inc. and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0. +// +// This Source Code may also be made available under the following Secondary +// Licenses when the conditions for such availability set forth in the Eclipse +// Public License v. 2.0 are satisfied: GNU General Public License, version 2 +// with the GNU Classpath Exception which is available at +// https://www.gnu.org/software/classpath/license.html. +// +// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +// ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { ReadBuffer, ReadBufferImpl, WriteBuffer, WriteBufferImpl } from './message-buffer'; +import { ResponseError } from './rpc-protocol'; + +export interface MessageCodec { + encode(object: From): To + decode(encodedObject: To): From +} + +/** + * The tag values for the default {@link ValueCodec}s. + */ +export enum ObjectType { + JSON = 0, + Undefined = 10, + Function = 20, + Object = 30, + Error = 40, + // eslint-disable-next-line @typescript-eslint/no-shadow + ResponseError = 50, + UInt8Array = 60, + Map = 70, + Set = 80, + ObjectArray = 90, + Number = 100, + Boolean = 110, + String = 120, + +} + +export interface ValueCodec { + readonly tag: number; + /** + * Returns true if this encoder can encode this value. + * @param value the value to be encoded + */ + canEncode(value: any): boolean; + /** + * Write the given value to the buffer. Will only be called if {@link canEncode(value)} returns true. + * @param buf The buffer to write to + * @param value The value to be written + * @param visitedObjects The collection of already visited (i.e. encoded) objects. Used to detect circular references + * @param recursiveEncode A function that will use the encoders registered on the {@link MessageEncoder} + * to write a value to the underlying buffer. This is used mostly to write structures like an array + * without having to know how to encode the values in the array + */ + write(buf: WriteBuffer, value: any, visitedObjects: WeakSet, recursiveEncode?: (buf: WriteBuffer, value: any, visitedObjects: WeakSet) => void): void; + + /** + * Reads a value from a read buffer. This method will be called for the decoder that is + * registered for the tag associated with the value encoder that encoded this value. + * @param buf The read buffer to read from + * @param recursiveDecode A function that will use the decoders registered on the {@link RpcMessageDecoder} + * to read values from the underlying read buffer. This is used mostly to decode structures like an array + * without having to know how to decode the values in the array. + */ + read(buf: ReadBuffer, recursiveDecode: (buf: ReadBuffer) => unknown): unknown; +} + +export interface SerializedError { + readonly $isError: true; + readonly name: string; + readonly message: string; + readonly stack: string; +} + +export function serializeError(error: Error): SerializedError { + const { name, message } = error; + const stack: string = (error as any).stacktrace ?? error.stack; + return { + $isError: true, + name, + message, + stack + }; +} + +/** + * Custom error thrown by the {@link RpcMessageEncoder} if an error occurred during the encoding and the + * object could not be written to the given {@link WriteBuffer} + */ +export class EncodingError extends Error { + constructor(msg: string) { + super(msg); + } +} + +export class BinaryMessageCodec implements MessageCodec { + + protected valueCodecs = new Map(); + protected sortedValueCodecs: ValueCodec[]; + + constructor(customValueCodecs: ValueCodec[] = [], overrideExisting = false) { + this.registerDefaults(); + customValueCodecs.forEach(codec => this.registerValueCodec(codec, overrideExisting)); + // Sort registered codecs ascending by their tag value + this.sortedValueCodecs = [...this.valueCodecs].sort((a, b) => b[0] - a[0]).map(entry => entry[1]); + } + + protected registerDefaults(): void { + this.registerValueCodec({ + tag: ObjectType.JSON, + canEncode: () => true, + write: (buf, value) => buf.writeString(JSON.stringify(value)), + read: buf => { + const json = buf.readString(); + return JSON.parse(json); + } + }); + this.registerValueCodec({ + tag: ObjectType.Undefined, + // eslint-disable-next-line no-null/no-null + canEncode: value => value == null, + write: () => { }, + read: () => undefined + }); + + this.registerValueCodec({ + tag: ObjectType.Function, + canEncode: value => typeof value === 'function', + write: () => { }, + read: () => ({}) + }); + + this.registerValueCodec({ + tag: ObjectType.Object, + canEncode: value => !!value && typeof value === 'object', + write: (buf, object, visitedObjects, recursiveEncode) => { + const properties = Object.keys(object); + const relevant = []; + for (const property of properties) { + const value = object[property]; + if (typeof value !== 'function') { + relevant.push([property, value]); + } + } + + buf.writeLength(relevant.length); + for (const [property, value] of relevant) { + buf.writeString(property); + recursiveEncode?.(buf, value, visitedObjects); + } + + }, + read: (buf, recursiveRead) => { + const propertyCount = buf.readLength(); + const result = Object.create({}); + for (let i = 0; i < propertyCount; i++) { + const key = buf.readString(); + const value = recursiveRead(buf); + result[key] = value; + } + return result; + } + }); + + this.registerValueCodec({ + tag: ObjectType.Error, + canEncode: value => value instanceof Error, + write: (buf, error: Error) => buf.writeString(JSON.stringify(serializeError(error))) + , + read: buf => { + const serializedError: SerializedError = JSON.parse(buf.readString()); + const error = new Error(serializedError.message); + Object.assign(error, serializedError); + return error; + } + }); + + this.registerValueCodec({ + tag: ObjectType.ResponseError, + canEncode: value => value instanceof ResponseError, + write: (buf, error: ResponseError) => { + const serializedError = { ...serializeError(error), code: error.code, data: error.data }; + this.writeTypedValue(buf, serializedError, new WeakSet()); + }, + read: buf => { + const serializedError = this.readTypedValue(buf); + const error = new ResponseError(serializedError.code, serializedError.message, serializedError.data); + Object.assign(error, serializedError); + return error; + } + }); + + this.registerValueCodec({ + tag: ObjectType.UInt8Array, + canEncode: value => value instanceof Uint8Array, + write: (buf, value) => { + buf.writeBytes(value); + }, + read: buf => buf.readBytes() + }); + + this.registerValueCodec({ + tag: ObjectType.Map, + canEncode: value => value instanceof Map, + write: (buf, value: Map, visitedObjects) => this.writeArray(buf, Array.from(value.entries()), visitedObjects), + read: buf => new Map(this.readArray(buf)) + }); + + this.registerValueCodec({ + tag: ObjectType.Set, + canEncode: value => value instanceof Set, + write: (buf, value: Set, visitedObjects) => this.writeArray(buf, [...value], visitedObjects), + read: buf => new Set(this.readArray(buf)) + + }); + + this.registerValueCodec({ + tag: ObjectType.ObjectArray, + canEncode: value => Array.isArray(value), + write: (buf, value, visitedObjects) => { + this.writeArray(buf, value, visitedObjects); + }, + read: buf => this.readArray(buf) + }); + + this.registerValueCodec({ + tag: ObjectType.Number, + canEncode: value => typeof value === 'number', + write: (buf, value) => { + buf.writeNumber(value); + }, + read: buf => buf.readNumber() + + }); + + this.registerValueCodec({ + tag: ObjectType.Boolean, + canEncode: value => typeof value === 'boolean', + write: (buf, value) => { + buf.writeUint8(value === true ? 1 : 0); + }, + read: buf => buf.readUint8() === 1 + }); + + this.registerValueCodec({ + tag: ObjectType.String, + canEncode: value => typeof value === 'string', + write: (buf, value) => { + buf.writeString(value); + }, + read: buf => buf.readString() + }); + } + + /** + * Registers a new {@link ValueCodec}. + * @param decoder the codec that should be registered. + * @param override boolean flag to indicate wether an existing registration + * with the same tag should be overwritten. + */ + registerValueCodec(codec: ValueCodec, override = false): void { + if (override && this.valueCodecs.has(codec.tag)) { + throw new Error(`A value codec with the tag '${codec.tag}' is already registered`); + } + this.valueCodecs.set(codec.tag, codec); + } + + encode(object: any): Uint8Array { + const buffer = new WriteBufferImpl(); + this.writeTypedValue(buffer, object, new WeakSet()); + return buffer.getCurrentContents(); + } + + decode(buffer: Uint8Array): any { + return this.readTypedValue(new ReadBufferImpl(buffer)); + } + + protected readArray(buf: ReadBuffer): any[] { + const length = buf.readLength(); + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = this.readTypedValue(buf); + } + return result; + } + + protected readTypedValue(buf: ReadBuffer): any { + const type = buf.readLength(); + const decoder = this.valueCodecs.get(type); + if (!decoder) { + throw new Error(`No decoder for tag ${type}`); + } + return decoder.read(buf, innerBuffer => this.readTypedValue(innerBuffer)); + } + + protected writeTypedValue(buf: WriteBuffer, value: any, visitedObjects: WeakSet): void { + if (value && typeof value === 'object') { + if (visitedObjects.has(value)) { + throw new EncodingError('Object to encode contains circular references!'); + } + visitedObjects.add(value); + } + try { + for (let i = 0; i < this.sortedValueCodecs.length; i++) { + if (this.sortedValueCodecs[i].canEncode(value)) { + const codec = this.sortedValueCodecs[i]; + buf.writeLength(codec.tag); + codec.write(buf, value, visitedObjects, (innerBuffer, innerValue, _visitedObjects) => { + this.writeTypedValue(innerBuffer, innerValue, _visitedObjects); + }); + return; + } + } + throw new EncodingError(`No suitable value encoder found for ${value}`); + } catch (err) { + throw err; + } finally { + if (value && typeof value === 'object') { + visitedObjects.delete(value); + } + } + } + + protected writeArray(buf: WriteBuffer, value: any[], visitedObjects: WeakSet): void { + buf.writeLength(value.length); + for (let i = 0; i < value.length; i++) { + this.writeTypedValue(buf, value[i], visitedObjects); + } + } +} diff --git a/packages/core/src/common/messaging/proxy-factory.spec.ts b/packages/core/src/common/messaging/proxy-factory.spec.ts index 37280e4dbfdaa..2dba275081437 100644 --- a/packages/core/src/common/messaging/proxy-factory.spec.ts +++ b/packages/core/src/common/messaging/proxy-factory.spec.ts @@ -16,7 +16,7 @@ import * as chai from 'chai'; import { JsonRpcProxyFactory, JsonRpcProxy } from './proxy-factory'; -import { ChannelPipe } from '../message-rpc/channel.spec'; +import { ChannelPipe } from '../messaging/channel-multiplexer.spec'; const expect = chai.expect; diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index 0562ada54e1bb..aacd59fb36fdb 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -16,13 +16,13 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { ResponseError } from '../message-rpc/rpc-message-encoder'; import { ApplicationError } from '../application-error'; import { Disposable } from '../disposable'; import { Emitter, Event } from '../event'; -import { Channel } from '../message-rpc/channel'; -import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol'; +import { Channel } from '../messaging/channel'; +import { RequestHandler, RpcProtocol } from '../messaging/rpc-protocol'; import { ConnectionHandler } from './handler'; +import { ResponseError } from './rpc-protocol'; export type JsonRpcServer = Disposable & { /** @@ -55,11 +55,11 @@ export class JsonRpcConnectionHandler implements ConnectionHan } } /** - * Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}. + * Factory for creating a new {@link RpcProtocol} for a given channel and {@link RequestHandler}. */ -export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; +export type RpcFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol; -const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); +export const defaultRpcFactory: RpcFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler); /** * Factory for JSON-RPC proxy objects. @@ -117,8 +117,10 @@ export class JsonRpcProxyFactory implements ProxyHandler { * * @param target - The object to expose to JSON-RPC methods calls. If this * is omitted, the proxy won't be able to handle requests, only send them. + * @param rpcFactory - The factory used to establish a {@link RpcProtocol} + * on top of the proxy channel. */ - constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) { + constructor(public target?: any, protected rpcFactory = defaultRpcFactory) { this.waitForConnection(); } @@ -143,7 +145,7 @@ export class JsonRpcProxyFactory implements ProxyHandler { * response. */ listen(channel: Channel): void { - const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args)); + const connection = this.rpcFactory(channel, (meth, args) => this.onRequest(meth, ...args)); connection.onNotification(event => this.onNotification(event.method, ...event.args)); this.connectionPromiseResolve(connection); diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/messaging/rpc-protocol.ts similarity index 69% rename from packages/core/src/common/message-rpc/rpc-protocol.ts rename to packages/core/src/common/messaging/rpc-protocol.ts index 6e037e6e8befd..dfef976944bda 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/messaging/rpc-protocol.ts @@ -15,38 +15,82 @@ // ***************************************************************************** /* eslint-disable @typescript-eslint/no-explicit-any */ -import { CancellationToken, CancellationTokenSource } from '../cancellation'; +import { CancellationError, CancellationToken, CancellationTokenSource } from '../cancellation'; import { DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; -import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder'; -import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer'; /** - * Handles request messages received by the {@link RpcServer}. + * This code lets you encode rpc protocol messages (request/reply/notification/error/cancel) + * into a channel write buffer and decode the same messages from a read buffer. + * Custom encoders/decoders can be registered to specially handling certain types of values + * to be encoded. Clients are responsible for ensuring that the set of tags for encoders + * is distinct and the same at both ends of a channel. */ -export type RequestHandler = (method: string, args: any[]) => Promise; + +export type RpcMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage | NotificationMessage; + +export const enum RpcMessageType { + Request = 1, + Notification = 2, + Reply = 3, + ReplyErr = 4, + Cancel = 5, +} + +export interface CancelMessage { + type: RpcMessageType.Cancel; + id: number; +} + +export interface RequestMessage { + type: RpcMessageType.Request; + id: number; + method: string; + args: any[]; +} + +export interface NotificationMessage { + type: RpcMessageType.Notification; + id: number; + method: string; + args: any[]; +} + +export interface ReplyMessage { + type: RpcMessageType.Reply; + id: number; + result: any; +} + +export interface ReplyErrMessage { + type: RpcMessageType.ReplyErr; + id: number; + error: any; +} /** - * Initialization options for a {@link RpcProtocol}. + * A special error that can be returned in case a request + * has failed. Provides additional information i.e. an error code + * and additional error data. */ -export interface RpcProtocolOptions { - /** - * The message encoder that should be used. If `undefined` the default {@link RpcMessageEncoder} will be used. - */ - encoder?: RpcMessageEncoder, - /** - * The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used. - */ - decoder?: RpcMessageDecoder +export class ResponseError extends Error { + constructor(readonly code: number, message: string, readonly data: any) { + super(message); + } } +/** + * Handles request messages received by the {@link RpcServer}. + */ +export type RequestHandler = (method: string, args: any[]) => Promise; + /** * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. * Clients can get a promise for a remote request result that will be either resolved or - * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request + * rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request * * Currently, there is no timeout handling for long running requests implemented. */ export class RpcProtocol { @@ -56,9 +100,6 @@ export class RpcProtocol { protected nextMessageId: number = 0; - protected readonly encoder: RpcMessageEncoder; - protected readonly decoder: RpcMessageDecoder; - protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter(); protected readonly cancellationTokenSources = new Map(); @@ -68,15 +109,13 @@ export class RpcProtocol { protected toDispose = new DisposableCollection(); - constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { - this.encoder = options.encoder ?? new RpcMessageEncoder(); - this.decoder = options.decoder ?? new RpcMessageDecoder(); + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler) { this.toDispose.push(this.onNotificationEmitter); - this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); + this.toDispose.push(channel.onMessage(message => this.handleMessage(message))); channel.onClose(() => this.toDispose.dispose()); } - handleMessage(message: RpcMessage): void { + protected handleMessage(message: RpcMessage): void { switch (message.type) { case RpcMessageType.Cancel: { this.handleCancel(message.id); @@ -91,11 +130,11 @@ export class RpcProtocol { break; } case RpcMessageType.Reply: { - this.handleReply(message.id, message.res); + this.handleReply(message.id, message.result); break; } case RpcMessageType.ReplyErr: { - this.handleReplyErr(message.id, message.err); + this.handleReplyErr(message.id, message.error); break; } } @@ -125,6 +164,10 @@ export class RpcProtocol { } } + protected sendRpcMessage(message: RpcMessage): void { + this.channel.send(message); + } + sendRequest(method: string, args: any[]): Promise { const id = this.nextMessageId++; const reply = new Deferred(); @@ -133,41 +176,28 @@ export class RpcProtocol { // args array and the `CANCELLATION_TOKEN_KEY` string instead. const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined; if (cancellationToken && cancellationToken.isCancellationRequested) { - return Promise.reject(this.cancelError()); + return Promise.reject(new CancellationError()); } if (cancellationToken) { args.push(RpcProtocol.CANCELLATION_TOKEN_KEY); cancellationToken.onCancellationRequested(() => { this.sendCancel(id); - this.pendingRequests.get(id)?.reject(this.cancelError()); } ); } this.pendingRequests.set(id, reply); - const output = this.channel.getWriteBuffer(); - this.encoder.request(output, id, method, args); - output.commit(); + this.sendRpcMessage({ type: RpcMessageType.Request, id, method, args }); return reply.promise; } sendNotification(method: string, args: any[]): void { - const output = this.channel.getWriteBuffer(); - this.encoder.notification(output, this.nextMessageId++, method, args); - output.commit(); + this.sendRpcMessage({ type: RpcMessageType.Notification, id: this.nextMessageId++, method, args }); } sendCancel(requestId: number): void { - const output = this.channel.getWriteBuffer(); - this.encoder.cancel(output, requestId); - output.commit(); - } - - cancelError(): Error { - const error = new Error('"Request has already been canceled by the sender"'); - error.name = 'Cancel'; - return error; + this.sendRpcMessage({ type: RpcMessageType.Cancel, id: requestId }); } protected handleCancel(id: number): void { @@ -179,7 +209,6 @@ export class RpcProtocol { } protected async handleRequest(id: number, method: string, args: any[]): Promise { - const output = this.channel.getWriteBuffer(); // Check if the last argument of the received args is the key for indicating that a cancellation token should be used // If so remove the key from the args and create a new cancellation token. @@ -193,19 +222,20 @@ export class RpcProtocol { try { const result = await this.requestHandler(method, args); this.cancellationTokenSources.delete(id); - this.encoder.replyOK(output, id, result); - output.commit(); + this.replyOK(id, result); } catch (err) { - // In case of an error the output buffer might already contains parts of an message. - // => Dispose the current buffer and retrieve a new, clean one for writing the response error. - if (output instanceof Uint8ArrayWriteBuffer) { - output.dispose(); - } - const errorOutput = this.channel.getWriteBuffer(); this.cancellationTokenSources.delete(id); - this.encoder.replyErr(errorOutput, id, err); - errorOutput.commit(); + this.replyError(id, err); } + + } + + protected replyOK(requestId: number, result: any): void { + this.sendRpcMessage({ type: RpcMessageType.Reply, id: requestId, result }); + } + + protected replyError(requestId: number, error: any): void { + this.sendRpcMessage({ type: RpcMessageType.ReplyErr, id: requestId, error }); } protected async handleNotify(id: number, method: string, args: any[]): Promise { diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 66b90eed33739..4963273df5c85 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -16,65 +16,43 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Emitter, Event } from '../event'; -import { WriteBuffer } from '../message-rpc'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; -import { DisposableCollection } from '../disposable'; +import { AbstractChannel } from './channel'; +import { BinaryMessageCodec, MessageCodec } from './message-codec'; + +export const wsServicePath = '/services'; /** * A channel that manages the main websocket connection between frontend and backend. All service channels * are reusing this main channel. (multiplexing). An {@link IWebSocket} abstraction is used to keep the implementation * independent of the actual websocket implementation and its execution context (backend vs. frontend). + * For efficient transportation messages are binary encoded with the {@link BinaryMessageCodec}. */ -export class WebSocketChannel implements Channel { - static wsPath = '/services'; - - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } +export class WebSocketChannel extends AbstractChannel { - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } - - protected toDispose = new DisposableCollection(); - - constructor(protected readonly socket: IWebSocket) { + constructor(protected readonly socket: IWebSocket, protected messageCodec: MessageCodec = new BinaryMessageCodec()) { + super(); this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); socket.onClose(() => this.close()); socket.onError(error => this.onErrorEmitter.fire(error)); // eslint-disable-next-line arrow-body-style - socket.onMessage(data => this.onMessageEmitter.fire(() => { - // In the browser context socketIO receives binary messages as ArrayBuffers. - // So we have to convert them to a Uint8Array before delegating the message to the read buffer. - const buffer = data instanceof ArrayBuffer ? new Uint8Array(data) : data; - return new Uint8ArrayReadBuffer(buffer); - })); + socket.onMessage(message => this.handleMessage(message)); } - getWriteBuffer(): WriteBuffer { - const result = new Uint8ArrayWriteBuffer(); - - result.onCommit(buffer => { - if (this.socket.isConnected()) { - this.socket.send(buffer); - } - }); + protected handleMessage(message: any): void { + const decoded = this.messageCodec.decode(message) ?? message; + this.onMessageEmitter.fire(decoded); + } - return result; + send(message: T): void { + if (this.socket.isConnected()) { + const encoded = this.messageCodec.encode(message) ?? message; + this.socket.send(encoded); + } } - close(): void { - this.toDispose.dispose(); + override close(): void { + super.close(); this.socket.close(); } } @@ -85,10 +63,10 @@ export class WebSocketChannel implements Channel { */ export interface IWebSocket { /** - * Sends the given message over the web socket in binary format. + * Sends the given message over the web socket. * @param message The binary message. */ - send(message: Uint8Array): void; + send(message: any): void; /** * Closes the websocket from the local side. */ @@ -101,7 +79,7 @@ export interface IWebSocket { * Listener callback to handle incoming messages. * @param cb The callback. */ - onMessage(cb: (message: Uint8Array) => void): void; + onMessage(cb: (UInt8: any) => void): void; /** * Listener callback to handle socket errors. * @param cb The callback. diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 3cce9e75c3951..c0a04b3956632 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -13,15 +13,15 @@ // // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +/* eslint-disable @typescript-eslint/no-explicit-any */ import { Event as ElectronEvent, ipcRenderer } from '@theia/electron/shared/electron'; import { injectable, interfaces } from 'inversify'; -import { JsonRpcProxy } from '../../common/messaging'; +import { AbstractChannel, Channel, JsonRpcProxy, MessageCodec } from '../../common/messaging'; import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider'; import { THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; -import { Emitter, Event } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; -import { Channel, MessageProvider } from '../../common/message-rpc/channel'; +import { BinaryMessageCodec } from '../../common/messaging/message-codec'; +import { Disposable } from 'vscode-languageserver-protocol'; export interface ElectronIpcOptions { } @@ -42,23 +42,30 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(); - ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { - onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); - }); - return { - close: () => Event.None, - getWriteBuffer: () => { - const writer = new Uint8ArrayWriteBuffer(); - writer.onCommit(buffer => - ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) - ); - return writer; - }, - onClose: Event.None, - onError: Event.None, - onMessage: onMessageEmitter.event - }; + return new ElectronIpcRendererChannel(); + } + +} + +export class ElectronIpcRendererChannel extends AbstractChannel { + + protected messageCodec: MessageCodec = new BinaryMessageCodec(); + + constructor() { + super(); + const ipcMessageListener = (_event: ElectronEvent, data: Uint8Array) => this.onMessageEmitter.fire(data); + ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageListener); + this.toDispose.push(Disposable.create(() => ipcRenderer.removeListener(THEIA_ELECTRON_IPC_CHANNEL_NAME, ipcMessageListener))); + } + + protected handleMessage(message: Uint8Array): void { + const decoded = this.messageCodec.decode(message); + this.onMessageEmitter.fire(decoded); + } + + send(message: any): void { + const encoded = this.messageCodec.encode(message); + ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, encoded); } } diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index f29776df9fa48..7bb24b36409fe 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -15,7 +15,7 @@ // ***************************************************************************** import { injectable } from 'inversify'; -import { WebSocketConnectionProvider, WebSocketOptions } from '../../browser/messaging/ws-connection-provider'; +import { WebSocketConnectionProvider, WebsocketOptions } from '../../browser/messaging/ws-connection-provider'; import { FrontendApplicationContribution } from '../../browser/frontend-application'; import { Channel } from '../../common'; @@ -37,10 +37,10 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - this.channelMultiPlexer?.onUnderlyingChannelClose({ reason: 'The frontend is "going away"', code: 1001 }); + this.channelMultiPlexer?.handleMainChannelClose({ reason: 'The frontend is "going away"', code: 1001 }); } - override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { + override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebsocketOptions): Promise { if (!this.stopping) { super.openChannel(path, handler, options); } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index b2e4b00e2e1a3..ab15b70764d9b 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -21,9 +21,8 @@ import { MessagingContribution } from '../../node/messaging/messaging-contributi import { ElectronConnectionHandler, THEIA_ELECTRON_IPC_CHANNEL_NAME } from '../../electron-common/messaging/electron-connection-handler'; import { ElectronMainApplicationContribution } from '../electron-main-application'; import { ElectronMessagingService } from './electron-messaging-service'; -import { Channel, ChannelCloseEvent, ChannelMultiplexer, MessageProvider } from '../../common/message-rpc/channel'; -import { Emitter, Event, WriteBuffer } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { AbstractChannel, Channel, ChannelMultiplexer, MessageCodec, } from '../../common/'; +import { BinaryMessageCodec } from '../../common/messaging/message-codec'; /** * This component replicates the role filled by `MessagingContribution` but for Electron. @@ -50,19 +49,19 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon @postConstruct() protected init(): void { - ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, data: Uint8Array) => { - this.handleIpcEvent(event, data); + ipcMain.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (event: IpcMainEvent, message: unknown) => { + this.handleIpcEvent(event, message); }); } - protected handleIpcEvent(event: IpcMainEvent, data: Uint8Array): void { + protected handleIpcEvent(event: IpcMainEvent, message: unknown): void { const sender = event.sender; // Get the multiplexer for a given window id try { const windowChannelData = this.windowChannelMultiplexer.get(sender.id) ?? this.createWindowChannelData(sender); - windowChannelData!.channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(data)); + windowChannelData!.channel.handleMessage(message); } catch (error) { - console.error('IPC: Failed to handle message', { error, data }); + console.error('IPC: Failed to handle message', { error, message }); } } @@ -78,8 +77,8 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon } }); - sender.once('did-navigate', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was refreshed' })); // When refreshing the browser window. - sender.once('destroyed', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was closed' })); // When closing the browser window. + sender.once('did-navigate', () => multiPlexer.handleMainChannelClose({ reason: 'Window was refreshed' })); // When refreshing the browser window. + sender.once('destroyed', () => multiPlexer.handleMainChannelClose({ reason: 'Window was closed' })); // When closing the browser window. const data = { channel: mainChannel, multiPlexer }; this.windowChannelMultiplexer.set(sender.id, data); return data; @@ -114,40 +113,25 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon * Used to establish a connection between the ipcMain and the Electron frontend (window). * Messages a transferred via electron IPC. */ -export class ElectronWebContentChannel implements Channel { - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - // Make the message emitter public so that we can easily forward messages received from the ipcMain. - readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } +export class ElectronWebContentChannel extends AbstractChannel { - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } + protected messageCodec: MessageCodec = new BinaryMessageCodec(); constructor(protected readonly sender: Electron.WebContents) { + super(); } - getWriteBuffer(): WriteBuffer { - const writer = new Uint8ArrayWriteBuffer(); - - writer.onCommit(buffer => { - if (!this.sender.isDestroyed()) { - this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer); - } - }); - - return writer; + handleMessage(message: unknown): void { + if (message instanceof Uint8Array) { + const decoded = this.messageCodec.decode(message); + this.onMessageEmitter.fire(decoded); + } } - close(): void { - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); + + send(message: unknown): void { + if (!this.sender.isDestroyed()) { + const encoded = this.messageCodec.encode(message); + this.sender.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, encoded); + } } } diff --git a/packages/core/src/electron-main/messaging/electron-messaging-service.ts b/packages/core/src/electron-main/messaging/electron-messaging-service.ts index 874d51237b4fd..77ad5b920071b 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-service.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-service.ts @@ -14,7 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import { Channel } from '../../common/'; export interface ElectronMessagingService { /** diff --git a/packages/core/src/node/messaging/binary-message-pipe.ts b/packages/core/src/node/messaging/binary-message-pipe.ts index 1143aef9cf4cc..5b9afc91b4f3d 100644 --- a/packages/core/src/node/messaging/binary-message-pipe.ts +++ b/packages/core/src/node/messaging/binary-message-pipe.ts @@ -16,7 +16,7 @@ import { Duplex } from 'stream'; import { Disposable, Emitter, Event } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { ReadBufferImpl, WriteBufferImpl } from '../../common/messaging/message-buffer'; /** * A `BinaryMessagePipe` is capable of sending and retrieving binary messages i.e. {@link Uint8Array}s over @@ -116,11 +116,10 @@ export class BinaryMessagePipe implements Disposable { * @returns the buffer contains the encoded message start */ protected encodeMessageStart(message: Uint8Array): Uint8Array { - const writer = new Uint8ArrayWriteBuffer() + const writer = new WriteBufferImpl() .writeString(BinaryMessagePipe.MESSAGE_START_IDENTIFIER) .writeUint32(message.length); const messageStart = writer.getCurrentContents(); - writer.dispose(); return messageStart; } @@ -140,7 +139,7 @@ export class BinaryMessagePipe implements Disposable { const messageData = this.cachedMessageData.partialMessageStart ? Buffer.concat([this.cachedMessageData.partialMessageStart, chunk]) : chunk; this.cachedMessageData.partialMessageStart = undefined; - const reader = new Uint8ArrayReadBuffer(messageData); + const reader = new ReadBufferImpl(messageData); const identifier = reader.readString(); if (identifier !== BinaryMessagePipe.MESSAGE_START_IDENTIFIER) { diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts index f6cf7724e8e81..d01f42247295a 100644 --- a/packages/core/src/node/messaging/ipc-channel.ts +++ b/packages/core/src/node/messaging/ipc-channel.ts @@ -18,8 +18,7 @@ import * as cp from 'child_process'; import { Socket } from 'net'; import { Duplex } from 'stream'; -import { Channel, ChannelCloseEvent, Disposable, DisposableCollection, Emitter, Event, MessageProvider, WriteBuffer } from '../../common'; -import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../../common/message-rpc/uint8-array-message-buffer'; +import { AbstractChannel, Disposable } from '../../common'; import { BinaryMessagePipe } from './binary-message-pipe'; /** @@ -27,36 +26,21 @@ import { BinaryMessagePipe } from './binary-message-pipe'; * This fd is opened as 5th channel in addition to the default stdios (stdin,stdout,stderr, ipc). This means the default channels * are not blocked and can be used by the respective process for additional custom message handling. */ -export class IPCChannel implements Channel { - - protected readonly onCloseEmitter: Emitter = new Emitter(); - get onClose(): Event { - return this.onCloseEmitter.event; - } - - protected readonly onMessageEmitter: Emitter = new Emitter(); - get onMessage(): Event { - return this.onMessageEmitter.event; - } - - protected readonly onErrorEmitter: Emitter = new Emitter(); - get onError(): Event { - return this.onErrorEmitter.event; - } +export class IPCChannel extends AbstractChannel { protected messagePipe: BinaryMessagePipe; - protected toDispose = new DisposableCollection(); protected ipcErrorListener: (error: Error) => void = error => this.onErrorEmitter.fire(error); constructor(childProcess?: cp.ChildProcess) { + super(); if (childProcess) { this.setupChildProcess(childProcess); } else { this.setupProcess(); } this.messagePipe.onMessage(message => { - this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message)); + this.onMessageEmitter.fire(message); }); this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); } @@ -81,17 +65,8 @@ export class IPCChannel implements Channel { })); } - getWriteBuffer(): WriteBuffer { - const result = new Uint8ArrayWriteBuffer(); - result.onCommit(buffer => { - this.messagePipe.send(buffer); - }); - - return result; - } - - close(): void { - this.toDispose.dispose(); + send(message: Uint8Array): void { + this.messagePipe.send(message); } } diff --git a/packages/core/src/node/messaging/ipc-protocol.ts b/packages/core/src/node/messaging/ipc-protocol.ts index 03aa3944521c3..6560cc564d092 100644 --- a/packages/core/src/node/messaging/ipc-protocol.ts +++ b/packages/core/src/node/messaging/ipc-protocol.ts @@ -15,7 +15,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** -import { Channel } from '../../common/message-rpc/channel'; +import { Channel } from '../../common'; const THEIA_PARENT_PID = 'THEIA_PARENT_PID'; const THEIA_ENTRY_POINT = 'THEIA_ENTRY_POINT'; diff --git a/packages/core/src/node/messaging/messaging-contribution.ts b/packages/core/src/node/messaging/messaging-contribution.ts index 20dea50a483e4..7deaaa4feeeb2 100644 --- a/packages/core/src/node/messaging/messaging-contribution.ts +++ b/packages/core/src/node/messaging/messaging-contribution.ts @@ -18,15 +18,14 @@ import * as http from 'http'; import * as https from 'https'; import { Server, Socket } from 'socket.io'; import { injectable, inject, named, postConstruct, interfaces, Container } from 'inversify'; -import { ContributionProvider, ConnectionHandler, bindContributionProvider } from '../../common'; -import { IWebSocket, WebSocketChannel } from '../../common/messaging/web-socket-channel'; +import { ContributionProvider, ConnectionHandler, bindContributionProvider, Channel, ChannelMultiplexer } from '../../common'; +import { IWebSocket, WebSocketChannel, wsServicePath } from '../../common/messaging/web-socket-channel'; import { BackendApplicationContribution } from '../backend-application'; import { MessagingService } from './messaging-service'; import { ConnectionContainerModule } from './connection-container-module'; import Route = require('route-parser'); import { WsRequestValidator } from '../ws-request-validators'; import { MessagingListener } from './messaging-listeners'; -import { Channel, ChannelMultiplexer } from '../../common/message-rpc/channel'; export const MessagingContainer = Symbol('MessagingContainer'); @@ -53,7 +52,7 @@ export class MessagingContribution implements BackendApplicationContribution, Me @postConstruct() protected init(): void { - this.ws(WebSocketChannel.wsPath, (_, socket) => this.handleChannels(socket)); + this.ws(wsServicePath, (_, socket) => this.handleChannels(socket)); for (const contribution of this.contributions.getContributions()) { contribution.configure(this); } @@ -108,9 +107,9 @@ export class MessagingContribution implements BackendApplicationContribution, Me protected handleChannels(socket: Socket): void { const socketChannel = new WebSocketChannel(this.toIWebSocket(socket)); - const mulitplexer = new ChannelMultiplexer(socketChannel); + const multiplexer = new ChannelMultiplexer(socketChannel); const channelHandlers = this.getConnectionChannelHandlers(socket); - mulitplexer.onDidOpenChannel(event => { + multiplexer.onDidOpenChannel(event => { if (channelHandlers.route(event.id, event.channel)) { console.debug(`Opening channel for service path '${event.id}'.`); event.channel.onClose(() => console.debug(`Closing channel on service path '${event.id}'.`)); diff --git a/packages/core/src/node/messaging/messaging-service.ts b/packages/core/src/node/messaging/messaging-service.ts index 276b58734bcff..ed3a49cf6afb2 100644 --- a/packages/core/src/node/messaging/messaging-service.ts +++ b/packages/core/src/node/messaging/messaging-service.ts @@ -15,7 +15,7 @@ // ***************************************************************************** import { Socket } from 'socket.io'; -import { Channel } from '../../common/message-rpc/channel'; +import { Channel } from '../../common'; export interface MessagingService { /** diff --git a/packages/core/src/node/messaging/test/test-web-socket-channel.ts b/packages/core/src/node/messaging/test/test-web-socket-channel.ts index 0ef0c50186cee..fa5e961aa4dd9 100644 --- a/packages/core/src/node/messaging/test/test-web-socket-channel.ts +++ b/packages/core/src/node/messaging/test/test-web-socket-channel.ts @@ -18,8 +18,8 @@ import * as http from 'http'; import * as https from 'https'; import { AddressInfo } from 'net'; import { io, Socket } from 'socket.io-client'; -import { Channel, ChannelMultiplexer } from '../../../common/message-rpc/channel'; -import { IWebSocket, WebSocketChannel } from '../../../common/messaging/web-socket-channel'; +import { Channel, ChannelMultiplexer } from '../../../common'; +import { IWebSocket, WebSocketChannel, wsServicePath } from '../../../common/messaging/web-socket-channel'; export class TestWebSocketChannelSetup { public readonly multiplexer: ChannelMultiplexer; @@ -29,11 +29,11 @@ export class TestWebSocketChannelSetup { server: http.Server | https.Server, path: string }) { - const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${WebSocketChannel.wsPath}`); + const socket = io(`ws://localhost:${(server.address() as AddressInfo).port}${wsServicePath}`); this.channel = new WebSocketChannel(toIWebSocket(socket)); this.multiplexer = new ChannelMultiplexer(this.channel); socket.on('connect', () => { - this.multiplexer.open(path); + this.multiplexer.openChannel(path); }); socket.connect(); } diff --git a/packages/debug/src/browser/debug-session-contribution.ts b/packages/debug/src/browser/debug-session-contribution.ts index 456a32783f06b..01b01929f4779 100644 --- a/packages/debug/src/browser/debug-session-contribution.ts +++ b/packages/debug/src/browser/debug-session-contribution.ts @@ -26,7 +26,7 @@ import { DebugSessionOptions } from './debug-session-options'; import { OutputChannelManager, OutputChannel } from '@theia/output/lib/browser/output-channel'; import { DebugPreferences } from './debug-preferences'; import { DebugSessionConnection } from './debug-session-connection'; -import { DebugChannel, DebugAdapterPath, ForwardingDebugChannel } from '../common/debug-service'; +import { DebugChannel, DebugAdapterPath } from '../common/debug-service'; import { ContributionProvider } from '@theia/core/lib/common/contribution-provider'; import { FileService } from '@theia/filesystem/lib/browser/file-service'; import { DebugContribution } from './debug-contribution'; @@ -123,7 +123,7 @@ export class DefaultDebugSessionFactory implements DebugSessionFactory { sessionId, () => new Promise(resolve => this.connectionProvider.openChannel(`${DebugAdapterPath}/${sessionId}`, wsChannel => { - resolve(new ForwardingDebugChannel(wsChannel)); + resolve(wsChannel); }, { reconnecting: false }) ), this.getTraceOutputChannel()); diff --git a/packages/debug/src/common/debug-service.ts b/packages/debug/src/common/debug-service.ts index 78a81643b2fe1..4fb08a9557bf2 100644 --- a/packages/debug/src/common/debug-service.ts +++ b/packages/debug/src/common/debug-service.ts @@ -16,7 +16,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { Channel, Disposable, Emitter, Event } from '@theia/core'; +import { Channel, Disposable, Event } from '@theia/core'; import { ApplicationError } from '@theia/core/lib/common/application-error'; import { IJSONSchema, IJSONSchemaSnippet } from '@theia/core/lib/common/json-schema'; import { CommandIdVariables } from '@theia/variable-resolver/lib/common/variable-types'; @@ -37,6 +37,11 @@ export const DebugPath = '/services/debug'; */ export const DebugService = Symbol('DebugService'); +/** + * A {@link} Channel to stringified debug protocol messages over with error/close handling + */ +export type DebugChannel = Channel; + /** * This service provides functionality to configure and to start a new debug adapter session. * The workflow is the following. If user wants to debug an application and @@ -140,47 +145,3 @@ export namespace DebugError { data: { type } })); } - -/** - * A closeable channel to send debug protocol messages over with error/close handling - */ -export interface DebugChannel { - send(content: string): void; - onMessage(cb: (message: string) => void): void; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError(cb: (reason: any) => void): void; - onClose(cb: (code: number, reason: string) => void): void; - close(): void; -} - -/** - * A {@link DebugChannel} wrapper implementation that sends and receives messages to/from an underlying {@link Channel}. - */ -export class ForwardingDebugChannel implements DebugChannel { - private onMessageEmitter = new Emitter(); - - constructor(private readonly underlyingChannel: Channel) { - this.underlyingChannel.onMessage(msg => this.onMessageEmitter.fire(msg().readString())); - } - - send(content: string): void { - this.underlyingChannel.getWriteBuffer().writeString(content).commit(); - } - - onMessage(cb: (message: string) => void): void { - this.onMessageEmitter.event(cb); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError(cb: (reason: any) => void): void { - this.underlyingChannel.onError(cb); - } - onClose(cb: (code: number, reason: string) => void): void { - this.underlyingChannel.onClose(event => cb(event.code ?? -1, event.reason)); - } - - close(): void { - this.underlyingChannel.close(); - this.onMessageEmitter.dispose(); - } - -} diff --git a/packages/debug/src/node/debug-adapter-session-manager.ts b/packages/debug/src/node/debug-adapter-session-manager.ts index cf4f013fb381e..2f46e87b232fa 100644 --- a/packages/debug/src/node/debug-adapter-session-manager.ts +++ b/packages/debug/src/node/debug-adapter-session-manager.ts @@ -18,7 +18,7 @@ import { UUID } from '@theia/core/shared/@phosphor/coreutils'; import { injectable, inject } from '@theia/core/shared/inversify'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; -import { DebugAdapterPath, ForwardingDebugChannel } from '../common/debug-service'; +import { DebugAdapterPath } from '../common/debug-service'; import { DebugConfiguration } from '../common/debug-configuration'; import { DebugAdapterSession, DebugAdapterSessionFactory, DebugAdapterFactory } from './debug-model'; import { DebugAdapterContributionRegistry } from './debug-adapter-contribution-registry'; @@ -43,7 +43,7 @@ export class DebugAdapterSessionManager implements MessagingService.Contribution wsChannel.close(); return; } - session.start(new ForwardingDebugChannel(wsChannel)); + session.start(wsChannel); }); } diff --git a/packages/plugin-ext/src/common/connection.ts b/packages/plugin-ext/src/common/connection.ts index 9c4c7640095f8..40a3e1b62f266 100644 --- a/packages/plugin-ext/src/common/connection.ts +++ b/packages/plugin-ext/src/common/connection.ts @@ -15,50 +15,37 @@ // ***************************************************************************** import { DebugChannel } from '@theia/debug/lib/common/debug-service'; import { ConnectionExt, ConnectionMain } from './plugin-api-rpc'; -import { Emitter } from '@theia/core/lib/common/event'; +import { AbstractChannel } from '@theia/core/lib/common/messaging/channel'; /** * A channel communicating with a counterpart in a plugin host. */ -export class PluginChannel implements DebugChannel { - private messageEmitter: Emitter = new Emitter(); - private errorEmitter: Emitter = new Emitter(); - private closedEmitter: Emitter = new Emitter(); +export class PluginChannel extends AbstractChannel implements DebugChannel { constructor( protected readonly id: string, - protected readonly connection: ConnectionExt | ConnectionMain) { } + protected readonly connection: ConnectionExt | ConnectionMain) { + super(); + } send(content: string): void { this.connection.$sendMessage(this.id, content); } fireMessageReceived(msg: string): void { - this.messageEmitter.fire(msg); + this.onMessageEmitter.fire(msg); } fireError(error: unknown): void { - this.errorEmitter.fire(error); + this.onErrorEmitter.fire(error); } fireClosed(): void { - this.closedEmitter.fire(); - } - - onMessage(cb: (message: string) => void): void { - this.messageEmitter.event(cb); - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError(cb: (reason: any) => void): void { - this.errorEmitter.event(cb); - } - - onClose(cb: (code: number, reason: string) => void): void { - this.closedEmitter.event(() => cb(-1, 'closed')); + this.onCloseEmitter.fire({ reason: 'closed' }); } - close(): void { + override close(): void { + super.close(); this.connection.$deleteConnection(this.id); } }