diff --git a/packages/core/package.json b/packages/core/package.json index 2e9e27f04d2ab..c9b5d9f15ab48 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -61,7 +61,6 @@ "react-dom": "^16.8.0", "react-tooltip": "^4.2.21", "react-virtualized": "^9.20.0", - "reconnecting-websocket": "^4.2.0", "reflect-metadata": "^0.1.10", "route-parser": "^0.0.5", "safer-buffer": "^2.1.2", diff --git a/packages/core/src/browser/messaging/ws-connection-provider.ts b/packages/core/src/browser/messaging/ws-connection-provider.ts index 905515a70392c..b44e34841ee2c 100644 --- a/packages/core/src/browser/messaging/ws-connection-provider.ts +++ b/packages/core/src/browser/messaging/ws-connection-provider.ts @@ -35,8 +35,6 @@ export interface WebSocketOptions { export class WebSocketConnectionProvider extends AbstractConnectionProvider { protected readonly onSocketDidOpenEmitter: Emitter = new Emitter(); - // Socket that is used by the main channel - protected socket: Socket; get onSocketDidOpen(): Event { return this.onSocketDidOpenEmitter.event; } @@ -50,18 +48,27 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider(path, arg); } - protected createMainChannel(): Channel { + protected readonly socket: Socket; + + constructor() { + super(); const url = this.createWebSocketUrl(WebSocketChannel.wsPath); - const socket = this.createWebSocket(url); - const channel = new WebSocketChannel(this.toIWebSocket(socket)); - socket.on('connect', () => { + this.socket = this.createWebSocket(url); + this.socket.on('connect', () => { + this.initializeMultiplexer(); + if (this.reconnectChannelOpeners.length > 0) { + this.reconnectChannelOpeners.forEach(opener => opener()); + this.reconnectChannelOpeners = []; + } + this.socket.on('disconnect', () => this.fireSocketDidClose()); + this.socket.on('message', () => this.onIncomingMessageActivityEmitter.fire(undefined)); this.fireSocketDidOpen(); }); - channel.onClose(() => this.fireSocketDidClose()); - socket.connect(); - this.socket = socket; + this.socket.connect(); + } - return channel; + protected createMainChannel(): Channel { + return new WebSocketChannel(this.toIWebSocket(this.socket)); } protected toIWebSocket(socket: Socket): IWebSocket { @@ -70,7 +77,6 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider socket.connected, onClose: cb => socket.on('disconnect', reason => cb(reason)), diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts index e4279b43d5173..5d9fc6985cc07 100644 --- a/packages/core/src/common/message-rpc/channel.ts +++ b/packages/core/src/common/message-rpc/channel.ts @@ -14,6 +14,7 @@ // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // ***************************************************************************** +import { Disposable, DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { ReadBuffer, WriteBuffer } from './message-buffer'; @@ -72,7 +73,9 @@ export type MessageProvider = () => ReadBuffer; */ export class ForwardingChannel implements Channel { + protected toDispose = new DisposableCollection(); constructor(readonly id: string, protected readonly closeHandler: () => void, protected readonly writeBufferSource: () => WriteBuffer) { + this.toDispose.pushAll([this.onCloseEmitter, this.onErrorEmitter, this.onMessageEmitter]); } onCloseEmitter: Emitter = new Emitter(); @@ -94,14 +97,9 @@ export class ForwardingChannel implements Channel { return this.writeBufferSource(); } - send(message: Uint8Array): void { - const writeBuffer = this.getWriteBuffer(); - writeBuffer.writeBytes(message); - writeBuffer.commit(); - } - close(): void { this.closeHandler(); + this.toDispose.dispose(); } } @@ -120,7 +118,7 @@ export enum MessageTypes { * channel, so we rely on writers to the multiplexed channels to always commit their * messages and always in one go. */ -export class ChannelMultiplexer { +export class ChannelMultiplexer implements Disposable { protected pendingOpen: Map void> = new Map(); protected openChannels: Map = new Map(); @@ -129,10 +127,15 @@ export class ChannelMultiplexer { return this.onOpenChannelEmitter.event; } + protected toDispose = new DisposableCollection(); + constructor(protected readonly underlyingChannel: Channel) { - this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())); - this.underlyingChannel.onClose(event => this.closeUnderlyingChannel(event)); - this.underlyingChannel.onError(error => this.handleError(error)); + this.toDispose.pushAll([ + this.underlyingChannel.onMessage(buffer => this.handleMessage(buffer())), + this.underlyingChannel.onClose(event => this.onUnderlyingChannelClose(event)), + this.underlyingChannel.onError(error => this.handleError(error)), + this.onOpenChannelEmitter + ]); } protected handleError(error: unknown): void { @@ -141,14 +144,19 @@ export class ChannelMultiplexer { }); } - closeUnderlyingChannel(event?: ChannelCloseEvent): void { - - this.pendingOpen.clear(); - this.openChannels.forEach(channel => { - channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); - }); + onUnderlyingChannelClose(event?: ChannelCloseEvent): void { + if (!this.toDispose.disposed) { + this.toDispose.push(Disposable.create(() => { + this.pendingOpen.clear(); + this.openChannels.forEach(channel => { + channel.onCloseEmitter.fire(event ?? { reason: 'Multiplexer main channel has been closed from the remote side!' }); + }); + + this.openChannels.clear(); + })); + this.dispose(); + } - this.openChannels.clear(); } protected handleMessage(buffer: ReadBuffer): void { @@ -244,5 +252,9 @@ export class ChannelMultiplexer { getOpenChannel(id: string): Channel | undefined { return this.openChannels.get(id); } + + dispose(): void { + this.toDispose.dispose(); + } } diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts index 4854fa124d612..6e037e6e8befd 100644 --- a/packages/core/src/common/message-rpc/rpc-protocol.ts +++ b/packages/core/src/common/message-rpc/rpc-protocol.ts @@ -16,6 +16,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { CancellationToken, CancellationTokenSource } from '../cancellation'; +import { DisposableCollection } from '../disposable'; import { Emitter, Event } from '../event'; import { Deferred } from '../promise-util'; import { Channel } from './channel'; @@ -40,6 +41,7 @@ export interface RpcProtocolOptions { */ decoder?: RpcMessageDecoder } + /** * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side. @@ -64,12 +66,14 @@ export class RpcProtocol { return this.onNotificationEmitter.event; } + protected toDispose = new DisposableCollection(); + constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) { this.encoder = options.encoder ?? new RpcMessageEncoder(); this.decoder = options.decoder ?? new RpcMessageDecoder(); - const registration = channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))); - channel.onClose(() => registration.dispose()); - + this.toDispose.push(this.onNotificationEmitter); + this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer())))); + channel.onClose(() => this.toDispose.dispose()); } handleMessage(message: RpcMessage): void { @@ -103,7 +107,7 @@ export class RpcProtocol { this.pendingRequests.delete(id); replyHandler.resolve(value); } else { - console.warn(`reply: no handler for message: ${id}`); + throw new Error(`No reply handler for reply with id: ${id}`); } } @@ -114,7 +118,7 @@ export class RpcProtocol { this.pendingRequests.delete(id); replyHandler.reject(error); } else { - console.warn(`error: no handler for message: ${id}`); + throw new Error(`No reply handler for error reply with id: ${id}`); } } catch (err) { throw err; @@ -122,7 +126,6 @@ export class RpcProtocol { } sendRequest(method: string, args: any[]): Promise { - const id = this.nextMessageId++; const reply = new Deferred(); @@ -176,7 +179,6 @@ export class RpcProtocol { } protected async handleRequest(id: number, method: string, args: any[]): Promise { - const output = this.channel.getWriteBuffer(); // Check if the last argument of the received args is the key for indicating that a cancellation token should be used @@ -207,6 +209,9 @@ export class RpcProtocol { } protected async handleNotify(id: number, method: string, args: any[]): Promise { + if (this.toDispose.disposed) { + return; + } this.onNotificationEmitter.fire({ method, args }); } } diff --git a/packages/core/src/common/messaging/abstract-connection-provider.ts b/packages/core/src/common/messaging/abstract-connection-provider.ts index d003fe86fc683..64f29bf185178 100644 --- a/packages/core/src/common/messaging/abstract-connection-provider.ts +++ b/packages/core/src/common/messaging/abstract-connection-provider.ts @@ -71,14 +71,16 @@ export abstract class AbstractConnectionProvider return factory.createProxy(); } - protected channelMultiPlexer: ChannelMultiplexer; + protected channelMultiPlexer?: ChannelMultiplexer; - constructor() { - this.channelMultiPlexer = this.createMultiplexer(); - } + // A set of channel opening functions that are executed if the backend reconnects to restore the + // the channels that were open before the disconnect occurred. + protected reconnectChannelOpeners: Array<() => Promise> = []; - protected createMultiplexer(): ChannelMultiplexer { - return new ChannelMultiplexer(this.createMainChannel()); + protected initializeMultiplexer(): void { + const mainChannel = this.createMainChannel(); + mainChannel.onMessage(() => this.onIncomingMessageActivityEmitter.fire()); + this.channelMultiPlexer = new ChannelMultiplexer(mainChannel); } /** @@ -91,18 +93,22 @@ export abstract class AbstractConnectionProvider } async openChannel(path: string, handler: (channel: Channel) => void, options?: AbstractOptions): Promise { + if (!this.channelMultiPlexer) { + throw new Error('The channel multiplexer has not been initialized yet!'); + } const newChannel = await this.channelMultiPlexer.open(path); newChannel.onClose(() => { const { reconnecting } = { reconnecting: true, ...options }; if (reconnecting) { - this.openChannel(path, handler, options); + this.reconnectChannelOpeners.push(() => this.openChannel(path, handler, options)); } }); + handler(newChannel); } /** - * Create the main connection that is used for multiplexing all channels. + * Create the main connection that is used for multiplexing all service channels. */ protected abstract createMainChannel(): Channel; diff --git a/packages/core/src/common/messaging/proxy-factory.ts b/packages/core/src/common/messaging/proxy-factory.ts index 765b70dd1a472..0562ada54e1bb 100644 --- a/packages/core/src/common/messaging/proxy-factory.ts +++ b/packages/core/src/common/messaging/proxy-factory.ts @@ -127,9 +127,11 @@ export class JsonRpcProxyFactory implements ProxyHandler { this.connectionPromiseResolve = resolve ); this.connectionPromise.then(connection => { - connection.channel.onClose(() => - this.onDidCloseConnectionEmitter.fire(undefined) - ); + connection.channel.onClose(() => { + this.onDidCloseConnectionEmitter.fire(undefined); + // Wait for connection in case the backend reconnects + this.waitForConnection(); + }); this.onDidOpenConnectionEmitter.fire(undefined); }); } diff --git a/packages/core/src/common/messaging/web-socket-channel.ts b/packages/core/src/common/messaging/web-socket-channel.ts index 74f7503e0b997..66b90eed33739 100644 --- a/packages/core/src/common/messaging/web-socket-channel.ts +++ b/packages/core/src/common/messaging/web-socket-channel.ts @@ -20,6 +20,7 @@ import { Emitter, Event } from '../event'; import { WriteBuffer } from '../message-rpc'; import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '../message-rpc/uint8-array-message-buffer'; import { Channel, MessageProvider, ChannelCloseEvent } from '../message-rpc/channel'; +import { DisposableCollection } from '../disposable'; /** * A channel that manages the main websocket connection between frontend and backend. All service channels @@ -44,8 +45,12 @@ export class WebSocketChannel implements Channel { return this.onErrorEmitter.event; } + protected toDispose = new DisposableCollection(); + constructor(protected readonly socket: IWebSocket) { + this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); socket.onClose((reason, code) => this.onCloseEmitter.fire({ reason, code })); + socket.onClose(() => this.close()); socket.onError(error => this.onErrorEmitter.fire(error)); // eslint-disable-next-line arrow-body-style socket.onMessage(data => this.onMessageEmitter.fire(() => { @@ -62,8 +67,6 @@ export class WebSocketChannel implements Channel { result.onCommit(buffer => { if (this.socket.isConnected()) { this.socket.send(buffer); - } else { - console.warn('Could not send message. Websocket is not connected'); } }); @@ -71,10 +74,8 @@ export class WebSocketChannel implements Channel { } close(): void { + this.toDispose.dispose(); this.socket.close(); - this.onCloseEmitter.dispose(); - this.onMessageEmitter.dispose(); - this.onErrorEmitter.dispose(); } } diff --git a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts index 6aa06861d2e93..3cce9e75c3951 100644 --- a/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ipc-connection-provider.ts @@ -36,6 +36,11 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider(path, arg); } + constructor() { + super(); + this.initializeMultiplexer(); + } + protected createMainChannel(): Channel { const onMessageEmitter = new Emitter(); ipcRenderer.on(THEIA_ELECTRON_IPC_CHANNEL_NAME, (_event: ElectronEvent, data: Uint8Array) => { @@ -46,7 +51,6 @@ export class ElectronIpcConnectionProvider extends AbstractConnectionProvider { const writer = new Uint8ArrayWriteBuffer(); writer.onCommit(buffer => - // The ipcRenderer cannot handle ArrayBuffers directly=> we have to convert to Uint8Array. ipcRenderer.send(THEIA_ELECTRON_IPC_CHANNEL_NAME, buffer) ); return writer; diff --git a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts index ac99ee85bbab8..f29776df9fa48 100644 --- a/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts +++ b/packages/core/src/electron-browser/messaging/electron-ws-connection-provider.ts @@ -37,7 +37,7 @@ export class ElectronWebSocketConnectionProvider extends WebSocketConnectionProv // Manually close the websocket connections `onStop`. Otherwise, the channels will be closed with 30 sec (`MessagingContribution#checkAliveTimeout`) delay. // https://github.com/eclipse-theia/theia/issues/6499 // `1001` indicates that an endpoint is "going away", such as a server going down or a browser having navigated away from a page. - this.channelMultiPlexer.closeUnderlyingChannel({ reason: 'The frontend is "going away"', code: 1001 }); + this.channelMultiPlexer?.onUnderlyingChannelClose({ reason: 'The frontend is "going away"', code: 1001 }); } override async openChannel(path: string, handler: (channel: Channel) => void, options?: WebSocketOptions): Promise { diff --git a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts index dfd8f6115c8ce..b2e4b00e2e1a3 100644 --- a/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts +++ b/packages/core/src/electron-main/messaging/electron-messaging-contribution.ts @@ -78,8 +78,8 @@ export class ElectronMessagingContribution implements ElectronMainApplicationCon } }); - sender.once('did-navigate', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was refreshed' })); // When refreshing the browser window. - sender.once('destroyed', () => multiPlexer.closeUnderlyingChannel({ reason: 'Window was closed' })); // When closing the browser window. + sender.once('did-navigate', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was refreshed' })); // When refreshing the browser window. + sender.once('destroyed', () => multiPlexer.onUnderlyingChannelClose({ reason: 'Window was closed' })); // When closing the browser window. const data = { channel: mainChannel, multiPlexer }; this.windowChannelMultiplexer.set(sender.id, data); return data; diff --git a/packages/core/src/node/messaging/ipc-channel.ts b/packages/core/src/node/messaging/ipc-channel.ts index 71fc65d774f08..f6cf7724e8e81 100644 --- a/packages/core/src/node/messaging/ipc-channel.ts +++ b/packages/core/src/node/messaging/ipc-channel.ts @@ -56,7 +56,6 @@ export class IPCChannel implements Channel { this.setupProcess(); } this.messagePipe.onMessage(message => { - console.log(`IPChannel: fire on message with ${message.length}`); this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message)); }); this.toDispose.pushAll([this.onCloseEmitter, this.onMessageEmitter, this.onErrorEmitter]); diff --git a/packages/terminal/src/browser/terminal-widget-impl.ts b/packages/terminal/src/browser/terminal-widget-impl.ts index 2471b912246b5..04228a13c6275 100644 --- a/packages/terminal/src/browser/terminal-widget-impl.ts +++ b/packages/terminal/src/browser/terminal-widget-impl.ts @@ -26,7 +26,6 @@ import { terminalsPath } from '../common/terminal-protocol'; import { IBaseTerminalServer, TerminalProcessInfo } from '../common/base-terminal-protocol'; import { TerminalWatcher } from '../common/terminal-watcher'; import { TerminalWidgetOptions, TerminalWidget, TerminalDimensions, TerminalExitStatus } from './base/terminal-widget'; -import { MessageConnection } from '@theia/core/shared/vscode-ws-jsonrpc'; import { Deferred } from '@theia/core/lib/common/promise-util'; import { TerminalPreferences, TerminalRendererType, isTerminalRendererType, DEFAULT_TERMINAL_RENDERER_TYPE, CursorStyle } from './terminal-preferences'; import { TerminalContribution } from './terminal-contribution'; diff --git a/yarn.lock b/yarn.lock index 6838954c6c89a..407302ba38351 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1591,10 +1591,6 @@ version "4.0.0" resolved "https://registry.yarnpkg.com/@lerna/resolve-symlink/-/resolve-symlink-4.0.0.tgz#6d006628a210c9b821964657a9e20a8c9a115e14" integrity sha512-RtX8VEUzqT+uLSCohx8zgmjc6zjyRlh6i/helxtZTMmc4+6O4FS9q5LJas2uGO2wKvBlhcD6siibGt7dIC3xZA== - dependencies: - fs-extra "^9.1.0" - npmlog "^4.1.2" - read-cmd-shim "^2.0.0" "@lerna/rimraf-dir@4.0.0": version "4.0.0" @@ -9695,11 +9691,6 @@ react@^16.8.0: object-assign "^4.1.1" prop-types "^15.6.2" -read-cmd-shim@^2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/read-cmd-shim/-/read-cmd-shim-2.0.0.tgz#4a50a71d6f0965364938e9038476f7eede3928d9" - integrity sha512-HJpV9bQpkl6KwjxlJcBoqu9Ba0PQg8TqSNIOrulGt54a0uup0HtevreFHzYzkm0lpnleRdNBzXznKrgxglEHQw== - read-package-json-fast@^2.0.1: version "2.0.3" resolved "https://registry.yarnpkg.com/read-package-json-fast/-/read-package-json-fast-2.0.3.tgz#323ca529630da82cb34b36cc0b996693c98c2b83" @@ -9856,11 +9847,6 @@ rechoir@^0.7.0: dependencies: resolve "^1.9.0" -reconnecting-websocket@^4.2.0: - version "4.4.0" - resolved "https://registry.yarnpkg.com/reconnecting-websocket/-/reconnecting-websocket-4.4.0.tgz#3b0e5b96ef119e78a03135865b8bb0af1b948783" - integrity sha512-D2E33ceRPga0NvTDhJmphEgJ7FUYF0v4lr1ki0csq06OdlxKfugGzN0dSkxM/NfqCxYELK4KcaTOUOjTV6Dcng== - redent@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/redent/-/redent-3.0.0.tgz#e557b7998316bb53c9f1f56fa626352c6963059f" @@ -11722,6 +11708,13 @@ vscode-uri@^2.1.1: resolved "https://registry.yarnpkg.com/vscode-uri/-/vscode-uri-2.1.2.tgz#c8d40de93eb57af31f3c715dd650e2ca2c096f1c" integrity sha512-8TEXQxlldWAuIODdukIb+TR5s+9Ds40eSJrw+1iDDA9IFORPjMELarNQE3myz5XIkWWpdprmJjm1/SxMlWOC8A== +vscode-windows-ca-certs@^0.3.0: + version "0.3.0" + resolved "https://registry.yarnpkg.com/vscode-windows-ca-certs/-/vscode-windows-ca-certs-0.3.0.tgz#324e1f8ba842bbf048a39e7c0ee8fe655e9adfcc" + integrity sha512-CYrpCEKmAFQJoZNReOrelNL+VKyebOVRCqL9evrBlVcpWQDliliJgU5RggGS8FPGtQ3jAKLQt9frF0qlxYYPKA== + dependencies: + node-addon-api "^3.0.2" + w3c-hr-time@^1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/w3c-hr-time/-/w3c-hr-time-1.0.2.tgz#0a89cdf5cc15822df9c360543676963e0cc308cd"