Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Replace Node.js packages in botframework-streaming #2617

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class StreamingHttpClient implements HttpClient {
request: httpRequest,
status: res.statusCode,
headers: httpRequest.headers,
readableStreamBody: res.streams.length > 0 ? res.streams[0].getStream() : undefined
readableStreamBody: res.streams.length > 0 ? res.streams[0].getStream() as any : undefined
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,10 @@

```ts

import { Duplex } from 'stream';
import { DuplexOptions } from 'stream';
import { Duplex } from 'stream-browserify';
import { DuplexOptions } from 'stream-browserify';
import * as WebSocket from 'ws';

// @public (undocumented)
export class BrowserWebSocket implements ISocket {
// Warning: (ae-forgotten-export) The symbol "IBrowserWebSocket" needs to be exported by the entry point index.d.ts
constructor(socket?: IBrowserWebSocket);
close(): void;
connect(serverAddress: string): Promise<void>;
readonly isConnected: boolean;
setOnCloseHandler(handler: (x: any) => void): void;
setOnErrorHandler(handler: (x: any) => void): void;
setOnMessageHandler(handler: (x: any) => void): void;
write(buffer: INodeBuffer): void;
}

// @public (undocumented)
export class ContentStream {
// Warning: (ae-forgotten-export) The symbol "PayloadAssembler" needs to be exported by the entry point index.d.ts
Expand Down Expand Up @@ -320,6 +307,8 @@ export interface INodeSocket {
// (undocumented)
readable: boolean;
// (undocumented)
readonly readableFlowing: boolean | null;
// (undocumented)
readonly readableHighWaterMark: number;
// (undocumented)
readonly readableLength: number;
Expand Down Expand Up @@ -516,7 +505,31 @@ export class StreamingResponse {
}

// @public (undocumented)
export class SubscribableStream extends Duplex {
export interface SubscribableStream {
// (undocumented)
end?(cb?: () => void): void;
// (undocumented)
end?(chunk: any, cb?: () => void): void;
// (undocumented)
end?(chunk: any, encoding?: string, cb?: () => void): void;
// (undocumented)
push?(chunk: any): boolean;
// (undocumented)
push?(chunk: any, encoding: string): boolean;
// (undocumented)
push?(chunk: any, encoding?: string): boolean;
// (undocumented)
read(size?: number): any;
// (undocumented)
write?(chunk: any): boolean;
// (undocumented)
write?(chunk: any, encoding: string): boolean;
// (undocumented)
write?(chunk: any, encoding: string, callback: (error?: Error | null) => void): boolean;
}

// @public (undocumented)
export class SubscribableStream extends Duplex implements SubscribableStream {
constructor(options?: DuplexOptions);
// (undocumented)
length: number;
Expand Down
1 change: 1 addition & 0 deletions libraries/botframework-streaming/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"typings": "lib/index.d.ts",
"dependencies": {
"@types/ws": "^6.0.3",
"stream-browserify": "^3.0.0",
"uuid": "^3.4.0",
"ws": "^7.1.2"
},
Expand Down
1 change: 0 additions & 1 deletion libraries/botframework-streaming/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export { StreamingRequest } from './streamingRequest';
export { StreamingResponse } from './streamingResponse';
export { SubscribableStream } from './subscribableStream';
export {
BrowserWebSocket,
NodeWebSocket,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
Expand Down
27 changes: 15 additions & 12 deletions libraries/botframework-streaming/src/namedPipe/namedPipeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { connect } from 'net';
import { ProtocolAdapter } from '../protocolAdapter';
import { RequestHandler } from '../requestHandler';
import { StreamingRequest } from '../streamingRequest';
import { RequestManager } from '../payloads';

import { IStreamingTransportClient, IReceiveResponse } from '../interfaces';
import { NamedPipeTransport } from './namedPipeTransport';
import {
PayloadReceiver,
PayloadSender
} from '../payloadTransport';
import { NamedPipeTransport } from './namedPipeTransport';
import { IStreamingTransportClient, IReceiveResponse } from '../interfaces';
import { ProtocolAdapter } from '../protocolAdapter';
import { RequestHandler } from '../requestHandler';
import { RequestManager } from '../payloads';
import { StreamingRequest } from '../streamingRequest';

/**
* Streaming transport client implementation that uses named pipes for inter-process communication.
Expand Down Expand Up @@ -53,12 +55,13 @@ export class NamedPipeClient implements IStreamingTransportClient {
* Establish a connection with no custom headers.
*/
public async connect(): Promise<void> {
let outgoingPipeName: string = NamedPipeTransport.PipePath + this._baseName + NamedPipeTransport.ServerIncomingPath;
let outgoing = connect(outgoingPipeName);
let incomingPipeName: string = NamedPipeTransport.PipePath + this._baseName + NamedPipeTransport.ServerOutgoingPath;
let incoming = connect(incomingPipeName);
this._sender.connect(new NamedPipeTransport(outgoing));
this._receiver.connect(new NamedPipeTransport(incoming));
const outgoingPipeName: string = NamedPipeTransport.PipePath + this._baseName + NamedPipeTransport.ServerIncomingPath;
const outgoing = connect(outgoingPipeName);
const incomingPipeName: string = NamedPipeTransport.PipePath + this._baseName + NamedPipeTransport.ServerOutgoingPath;
const incoming = connect(incomingPipeName);
// Cast Sockets as any to conform with original INodeSocket interface.
this._sender.connect(new NamedPipeTransport(outgoing as any));
this._receiver.connect(new NamedPipeTransport(incoming as any));
}

/**
Expand Down
26 changes: 20 additions & 6 deletions libraries/botframework-streaming/src/subscribableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,23 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import { Duplex, DuplexOptions } from 'stream';

export class SubscribableStream extends Duplex {
import { Duplex, DuplexOptions, Readable } from 'stream-browserify';

export interface SubscribableStream {
end?(cb?: () => void): void;
end?(chunk: any, cb?: () => void): void;
end?(chunk: any, encoding?: string, cb?: () => void): void;
push?(chunk: any): boolean;
push?(chunk: any, encoding: string): boolean;
push?(chunk: any, encoding?: string): boolean;
read(size?: number): any;
write?(chunk: any): boolean;
write?(chunk: any, encoding: string): boolean;
write?(chunk: any, encoding: string, callback: (error?: Error | null) => void): boolean;
}

export class SubscribableStream extends Duplex implements SubscribableStream {
public length: number = 0;

private readonly bufferList: Buffer[] = [];
Expand All @@ -18,7 +32,7 @@ export class SubscribableStream extends Duplex {
}

public _write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {
let buffer = Buffer.from(chunk);
const buffer = Buffer.from(chunk);
this.bufferList.push(buffer);
this.length += chunk.length;
if (this._onData) {
Expand All @@ -30,12 +44,12 @@ export class SubscribableStream extends Duplex {
public _read(size: number): void {
if (this.bufferList.length === 0) {
// null signals end of stream
this.push(null);
(this as Readable).push(null);
} else {
let total = 0;
while (total < size && this.bufferList.length > 0) {
let buffer = this.bufferList[0];
this.push(buffer);
const buffer = this.bufferList[0];
(this as Readable).push(buffer);
this.bufferList.splice(0, 1);
total += buffer.length;
}
Expand Down
10 changes: 6 additions & 4 deletions libraries/botframework-streaming/src/webSocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
* Licensed under the MIT License.
*/

export * from './browserWebSocket';
export * from './factories';
export * from './nodeWebSocket';
export * from './webSocketClient';
export {
NodeWebSocket,
NodeWebSocketFactory,
NodeWebSocketFactoryBase,
WebSocketClient
} from './node';
export * from './webSocketServer';
export * from './webSocketTransport';
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
export * from './nodeWebSocketFactory';
export * from './nodeWebSocketFactoryBase';
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export * from './nodeWebSocketFactory';
export * from './nodeWebSocketFactoryBase';
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Licensed under the MIT License.
*/

import { INodeIncomingMessage, INodeBuffer, INodeSocket } from '../../interfaces';
import { INodeIncomingMessage, INodeBuffer, INodeSocket } from '../../../interfaces';
import { NodeWebSocket } from '../nodeWebSocket';
import { NodeWebSocketFactoryBase } from './nodeWebSocketFactoryBase';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../../interfaces';
export abstract class NodeWebSocketFactoryBase {
public abstract createWebSocket(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise<ISocket>;
}
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../../../interfaces';

export abstract class NodeWebSocketFactoryBase {
public abstract createWebSocket(req: INodeIncomingMessage, socket: INodeSocket, head: INodeBuffer): Promise<ISocket>;
}
11 changes: 11 additions & 0 deletions libraries/botframework-streaming/src/webSocket/node/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export { NodeWebSocketFactory, NodeWebSocketFactoryBase } from './factories';
export { NodeWebSocket } from './nodeWebSocket';
export { WebSocketClient } from './nodeWebSocketClient';
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import * as crypto from 'crypto';
import { IncomingMessage, request } from 'http';
import * as WebSocket from 'ws';

import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../interfaces';
import { INodeIncomingMessage, INodeBuffer, INodeSocket, ISocket } from '../../interfaces';
const NONCE_LENGTH = 16;

export class NodeWebSocket implements ISocket {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* @module botframework-streaming
*/
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import { ProtocolAdapter } from '../../protocolAdapter';
import { RequestHandler } from '../../requestHandler';
import { StreamingRequest } from '../../streamingRequest';
import { RequestManager } from '../../payloads';
import {
PayloadReceiver,
PayloadSender,
TransportDisconnectedEvent
} from '../../payloadTransport';
import { WebSocketTransport } from '../webSocketTransport';
import { IStreamingTransportClient, IReceiveResponse } from '../../interfaces';
import { NodeWebSocket } from './nodeWebSocket';

/**
* Web socket based client to be used as streaming transport.
*/
export class WebSocketClient implements IStreamingTransportClient {
private readonly _url: string;
private readonly _requestHandler: RequestHandler;
private readonly _sender: PayloadSender;
private readonly _receiver: PayloadReceiver;
private readonly _requestManager: RequestManager;
private readonly _protocolAdapter: ProtocolAdapter;
private readonly _disconnectionHandler: (message: string) => void;

/**
* Creates a new instance of the [WebSocketClient](xref:botframework-streaming.WebSocketClient) class.
*
* @param url The URL of the remote server to connect to.
* @param requestHandler Optional [RequestHandler](xref:botframework-streaming.RequestHandler) to process incoming messages received by this server.
* @param disconnectionHandler Optional function to handle the disconnection message.
*/
public constructor({ url, requestHandler, disconnectionHandler = null}) {
this._url = url;
this._requestHandler = requestHandler;
this._disconnectionHandler = disconnectionHandler;

this._requestManager = new RequestManager();

this._sender = new PayloadSender();
this._sender.disconnected = this.onConnectionDisconnected.bind(this);
this._receiver = new PayloadReceiver();
this._receiver.disconnected = this.onConnectionDisconnected.bind(this);

this._protocolAdapter = new ProtocolAdapter(this._requestHandler, this._requestManager, this._sender, this._receiver);
}

/**
* Establish a connection with no custom headers.
*
* @returns A promise that will not resolve until the client stops listening for incoming messages.
*/
public async connect(): Promise<void> {
const ws = new NodeWebSocket();
try {
await ws.connect(this._url);
const transport = new WebSocketTransport(ws);
this._sender.connect(transport);
this._receiver.connect(transport);
} catch (error) {
throw(new Error(`Unable to connect client to Node transport.`));
}
}

/**
* Stop this client from listening.
*/
public disconnect(): void {
this._sender.disconnect(new TransportDisconnectedEvent('Disconnect was called.'));
this._receiver.disconnect(new TransportDisconnectedEvent('Disconnect was called.'));
}

/**
* Task used to send data over this client connection.
*
* @param request The streaming request to send.
* @returns A promise that will produce an instance of receive response on completion of the send operation.
*/
public async send(request: StreamingRequest): Promise<IReceiveResponse> {
return this._protocolAdapter.sendRequest(request);
}

private onConnectionDisconnected(sender: object, args: any): void {
if (this._disconnectionHandler != null) {
this._disconnectionHandler('Disconnected');
return;
}

throw(new Error(`Unable to re-connect client to Node transport for url ${ this._url }. Sender: '${ sender }'. Args:' ${ args }`));
}
}
Loading