From 5d146afa0bf5a44bbbef0b4aa0c79de9d764a1a1 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Date: Sun, 30 Jan 2022 12:48:51 +0100 Subject: [PATCH 1/2] refactor(microservices): move tcp socket logic to an abstract class --- packages/microservices/helpers/json-socket.ts | 87 +++---------------- packages/microservices/helpers/tcp-socket.ts | 78 +++++++++++++++++ 2 files changed, 90 insertions(+), 75 deletions(-) create mode 100644 packages/microservices/helpers/tcp-socket.ts diff --git a/packages/microservices/helpers/json-socket.ts b/packages/microservices/helpers/json-socket.ts index 0320e0dbc4b..5d017f9b52b 100644 --- a/packages/microservices/helpers/json-socket.ts +++ b/packages/microservices/helpers/json-socket.ts @@ -1,83 +1,29 @@ -import { Socket } from 'net'; +import { Buffer } from 'buffer'; import { StringDecoder } from 'string_decoder'; -import { - CLOSE_EVENT, - CONNECT_EVENT, - DATA_EVENT, - ERROR_EVENT, - MESSAGE_EVENT, -} from '../constants'; import { CorruptedPacketLengthException } from '../errors/corrupted-packet-length.exception'; -import { InvalidJSONFormatException } from '../errors/invalid-json-format.exception'; -import { NetSocketClosedException } from '../errors/net-socket-closed.exception'; +import { TcpSocket } from './tcp-socket'; -export class JsonSocket { +export class JsonSocket extends TcpSocket { private contentLength: number | null = null; - private isClosed = false; private buffer = ''; private readonly stringDecoder = new StringDecoder(); - private readonly delimeter = '#'; + private readonly delimiter = '#'; - public get netSocket() { - return this.socket; - } - - constructor(public readonly socket: Socket) { - this.socket.on(DATA_EVENT, this.onData.bind(this)); - this.socket.on(CONNECT_EVENT, () => (this.isClosed = false)); - this.socket.on(CLOSE_EVENT, () => (this.isClosed = true)); - this.socket.on(ERROR_EVENT, () => (this.isClosed = true)); - } - - public connect(port: number, host: string) { - this.socket.connect(port, host); - return this; - } - - public on(event: string, callback: (err?: any) => void) { - this.socket.on(event, callback); - return this; - } - - public once(event: string, callback: (err?: any) => void) { - this.socket.once(event, callback); - return this; - } - - public end() { - this.socket.end(); - return this; - } - - public sendMessage(message: any, callback?: (err?: any) => void) { - if (this.isClosed) { - callback && callback(new NetSocketClosedException()); - return; - } + protected handleSend(message: any, callback?: (err?: any) => void) { this.socket.write(this.formatMessageData(message), 'utf-8', callback); } - private onData(dataRaw: Buffer | string) { + protected handleData(dataRaw: Buffer | string) { const data = Buffer.isBuffer(dataRaw) ? this.stringDecoder.write(dataRaw) : dataRaw; - - try { - this.handleData(data); - } catch (e) { - this.socket.emit(ERROR_EVENT, e.message); - this.socket.end(); - } - } - - private handleData(data: string) { this.buffer += data; if (this.contentLength == null) { - const i = this.buffer.indexOf(this.delimeter); + const i = this.buffer.indexOf(this.delimiter); /** - * Check if the buffer has the delimeter (#), + * Check if the buffer has the delimiter (#), * if not, the end of the buffer string might be in the middle of a content length string */ if (i !== -1) { @@ -95,36 +41,27 @@ export class JsonSocket { if (this.contentLength !== null) { const length = this.buffer.length; - if (length === this.contentLength) { this.handleMessage(this.buffer); } else if (length > this.contentLength) { const message = this.buffer.substring(0, this.contentLength); const rest = this.buffer.substring(this.contentLength); this.handleMessage(message); - this.onData(rest); + this.handleData(rest); } } } - private handleMessage(data: string) { + private handleMessage(message: any) { this.contentLength = null; this.buffer = ''; - - let message: Record; - try { - message = JSON.parse(data); - } catch (e) { - throw new InvalidJSONFormatException(e, data); - } - message = message || {}; - this.socket.emit(MESSAGE_EVENT, message); + this.emitMessage(message); } private formatMessageData(message: any) { const messageData = JSON.stringify(message); const length = messageData.length; - const data = length + this.delimeter + messageData; + const data = length + this.delimiter + messageData; return data; } } diff --git a/packages/microservices/helpers/tcp-socket.ts b/packages/microservices/helpers/tcp-socket.ts new file mode 100644 index 00000000000..cca8fe94dfb --- /dev/null +++ b/packages/microservices/helpers/tcp-socket.ts @@ -0,0 +1,78 @@ +import { Buffer } from 'buffer'; +import { Socket } from 'net'; +import { + CLOSE_EVENT, + CONNECT_EVENT, + DATA_EVENT, + ERROR_EVENT, + MESSAGE_EVENT, +} from '../constants'; +import { NetSocketClosedException } from '../errors/net-socket-closed.exception'; +import { InvalidJSONFormatException } from '../errors/invalid-json-format.exception'; + +export abstract class TcpSocket { + private isClosed = false; + + public get netSocket() { + return this.socket; + } + + constructor(public readonly socket: Socket) { + this.socket.on(DATA_EVENT, this.onData.bind(this)); + this.socket.on(CONNECT_EVENT, () => (this.isClosed = false)); + this.socket.on(CLOSE_EVENT, () => (this.isClosed = true)); + this.socket.on(ERROR_EVENT, () => (this.isClosed = true)); + } + + public connect(port: number, host: string) { + this.socket.connect(port, host); + return this; + } + + public on(event: string, callback: (err?: any) => void) { + this.socket.on(event, callback); + return this; + } + + public once(event: string, callback: (err?: any) => void) { + this.socket.once(event, callback); + return this; + } + + public end() { + this.socket.end(); + return this; + } + + public sendMessage(message: any, callback?: (err?: any) => void) { + if (this.isClosed) { + callback && callback(new NetSocketClosedException()); + return; + } + this.handleSend(message, callback); + } + + protected abstract handleSend(message: any, callback?: (err?: any) => void); + + private onData(data: Buffer) { + try { + this.handleData(data); + } catch (e) { + this.socket.emit(ERROR_EVENT, e.message); + this.socket.end(); + } + } + + protected abstract handleData(data: Buffer | string); + + protected emitMessage(data: string) { + let message: Record; + try { + message = JSON.parse(data); + } catch (e) { + throw new InvalidJSONFormatException(e, data); + } + message = message || {}; + this.socket.emit(MESSAGE_EVENT, message); + } +} From e8abf50812ee27ab5dcde8bc90d72435f6a1e607 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Date: Sun, 30 Jan 2022 14:16:31 +0100 Subject: [PATCH 2/2] feat(microservices): allow use of custom tcp sockets --- packages/microservices/client/client-tcp.ts | 15 +++++++++------ packages/microservices/ctx-host/tcp.context.ts | 4 ++-- packages/microservices/helpers/index.ts | 1 + .../interfaces/client-metadata.interface.ts | 2 ++ .../microservice-configuration.interface.ts | 3 +++ packages/microservices/server/server-tcp.ts | 12 ++++++++---- 6 files changed, 25 insertions(+), 12 deletions(-) diff --git a/packages/microservices/client/client-tcp.ts b/packages/microservices/client/client-tcp.ts index f16c6407c2c..8c3fc22417d 100644 --- a/packages/microservices/client/client-tcp.ts +++ b/packages/microservices/client/client-tcp.ts @@ -1,4 +1,4 @@ -import { Logger } from '@nestjs/common'; +import { Logger, Type } from '@nestjs/common'; import * as net from 'net'; import { EmptyError, lastValueFrom } from 'rxjs'; import { share, tap } from 'rxjs/operators'; @@ -10,7 +10,7 @@ import { TCP_DEFAULT_HOST, TCP_DEFAULT_PORT, } from '../constants'; -import { JsonSocket } from '../helpers/json-socket'; +import { JsonSocket, TcpSocket } from '../helpers'; import { PacketId, ReadPacket, WritePacket } from '../interfaces'; import { TcpClientOptions } from '../interfaces/client-metadata.interface'; import { ClientProxy } from './client-proxy'; @@ -20,13 +20,16 @@ export class ClientTCP extends ClientProxy { private readonly logger = new Logger(ClientTCP.name); private readonly port: number; private readonly host: string; + private readonly socketClass: Type; private isConnected = false; - private socket: JsonSocket; + private socket: TcpSocket; constructor(options: TcpClientOptions['options']) { super(); this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT; this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST; + this.socketClass = + this.getOptionsProp(options, 'socketClass') || JsonSocket; this.initializeSerializer(options); this.initializeDeserializer(options); @@ -80,8 +83,8 @@ export class ClientTCP extends ClientProxy { }); } - public createSocket(): JsonSocket { - return new JsonSocket(new net.Socket()); + public createSocket(): TcpSocket { + return new this.socketClass(new net.Socket()); } public close() { @@ -89,7 +92,7 @@ export class ClientTCP extends ClientProxy { this.handleClose(); } - public bindEvents(socket: JsonSocket) { + public bindEvents(socket: TcpSocket) { socket.on( ERROR_EVENT, (err: any) => err.code !== ECONNREFUSED && this.handleError(err), diff --git a/packages/microservices/ctx-host/tcp.context.ts b/packages/microservices/ctx-host/tcp.context.ts index a858ea442e2..1f1c6a28d1a 100644 --- a/packages/microservices/ctx-host/tcp.context.ts +++ b/packages/microservices/ctx-host/tcp.context.ts @@ -1,7 +1,7 @@ -import { JsonSocket } from '../helpers/json-socket'; +import { TcpSocket } from '../helpers'; import { BaseRpcContext } from './base-rpc.context'; -type TcpContextArgs = [JsonSocket, string]; +type TcpContextArgs = [TcpSocket, string]; export class TcpContext extends BaseRpcContext { constructor(args: TcpContextArgs) { diff --git a/packages/microservices/helpers/index.ts b/packages/microservices/helpers/index.ts index a02fe778e1b..ff6118b517b 100644 --- a/packages/microservices/helpers/index.ts +++ b/packages/microservices/helpers/index.ts @@ -2,3 +2,4 @@ export * from './json-socket'; export * from './kafka-logger'; export * from './kafka-parser'; export * from './kafka-reply-partition-assigner'; +export * from './tcp-socket'; diff --git a/packages/microservices/interfaces/client-metadata.interface.ts b/packages/microservices/interfaces/client-metadata.interface.ts index 4962eecc12d..ace44c26578 100644 --- a/packages/microservices/interfaces/client-metadata.interface.ts +++ b/packages/microservices/interfaces/client-metadata.interface.ts @@ -1,5 +1,6 @@ import { Type } from '@nestjs/common'; import { ClientProxy } from '../client'; +import { TcpSocket } from '../helpers'; import { Transport } from '../enums/transport.enum'; import { Deserializer } from './deserializer.interface'; import { @@ -33,5 +34,6 @@ export interface TcpClientOptions { port?: number; serializer?: Serializer; deserializer?: Deserializer; + socketClass?: Type; }; } diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 9942b8b5580..9fd257e69bf 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -1,3 +1,5 @@ +import { Type } from '@nestjs/common'; +import { TcpSocket } from '../helpers'; import { Transport } from '../enums/transport.enum'; import { ChannelOptions } from '../external/grpc-options.interface'; import { @@ -80,6 +82,7 @@ export interface TcpOptions { retryDelay?: number; serializer?: Serializer; deserializer?: Deserializer; + socketClass?: Type; }; } diff --git a/packages/microservices/server/server-tcp.ts b/packages/microservices/server/server-tcp.ts index 067a3b19afa..c4ef47a3bd5 100644 --- a/packages/microservices/server/server-tcp.ts +++ b/packages/microservices/server/server-tcp.ts @@ -1,3 +1,4 @@ +import { Type } from '@nestjs/common'; import { isString, isUndefined } from '@nestjs/common/utils/shared.utils'; import * as net from 'net'; import { Server as NetSocket, Socket } from 'net'; @@ -14,7 +15,7 @@ import { } from '../constants'; import { TcpContext } from '../ctx-host/tcp.context'; import { Transport } from '../enums'; -import { JsonSocket } from '../helpers/json-socket'; +import { JsonSocket, TcpSocket } from '../helpers'; import { CustomTransportStrategy, IncomingRequest, @@ -30,6 +31,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy { private readonly port: number; private readonly host: string; + private readonly socketClass: Type; private server: NetSocket; private isExplicitlyTerminated = false; private retryAttemptsCount = 0; @@ -38,6 +40,8 @@ export class ServerTCP extends Server implements CustomTransportStrategy { super(); this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT; this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST; + this.socketClass = + this.getOptionsProp(options, 'socketClass') || JsonSocket; this.init(); this.initializeSerializer(options); @@ -68,7 +72,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy { readSocket.on(ERROR_EVENT, this.handleError.bind(this)); } - public async handleMessage(socket: JsonSocket, rawMessage: unknown) { + public async handleMessage(socket: TcpSocket, rawMessage: unknown) { const packet = await this.deserializer.deserialize(rawMessage); const pattern = !isString(packet.pattern) ? JSON.stringify(packet.pattern) @@ -124,7 +128,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy { this.server.on(CLOSE_EVENT, this.handleClose.bind(this)); } - private getSocketInstance(socket: Socket): JsonSocket { - return new JsonSocket(socket); + private getSocketInstance(socket: Socket): TcpSocket { + return new this.socketClass(socket); } }