Skip to content

Commit

Permalink
Merge pull request #9059 from jeanbmar/raw-tcp
Browse files Browse the repository at this point in the history
feat(microservices): add tcp raw data processing capabilities
  • Loading branch information
kamilmysliwiec authored Mar 1, 2022
2 parents 61c5c06 + e8abf50 commit 0a90cbe
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 87 deletions.
15 changes: 9 additions & 6 deletions packages/microservices/client/client-tcp.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger } from '@nestjs/common';
import { Logger, Type } from '@nestjs/common';
import * as net from 'net';
import { EmptyError, lastValueFrom } from 'rxjs';
import { share, tap } from 'rxjs/operators';
Expand All @@ -10,7 +10,7 @@ import {
TCP_DEFAULT_HOST,
TCP_DEFAULT_PORT,
} from '../constants';
import { JsonSocket } from '../helpers/json-socket';
import { JsonSocket, TcpSocket } from '../helpers';
import { PacketId, ReadPacket, WritePacket } from '../interfaces';
import { TcpClientOptions } from '../interfaces/client-metadata.interface';
import { ClientProxy } from './client-proxy';
Expand All @@ -20,13 +20,16 @@ export class ClientTCP extends ClientProxy {
private readonly logger = new Logger(ClientTCP.name);
private readonly port: number;
private readonly host: string;
private readonly socketClass: Type<TcpSocket>;
private isConnected = false;
private socket: JsonSocket;
private socket: TcpSocket;

constructor(options: TcpClientOptions['options']) {
super();
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT;
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.socketClass =
this.getOptionsProp(options, 'socketClass') || JsonSocket;

this.initializeSerializer(options);
this.initializeDeserializer(options);
Expand Down Expand Up @@ -80,16 +83,16 @@ export class ClientTCP extends ClientProxy {
});
}

public createSocket(): JsonSocket {
return new JsonSocket(new net.Socket());
public createSocket(): TcpSocket {
return new this.socketClass(new net.Socket());
}

public close() {
this.socket && this.socket.end();
this.handleClose();
}

public bindEvents(socket: JsonSocket) {
public bindEvents(socket: TcpSocket) {
socket.on(
ERROR_EVENT,
(err: any) => err.code !== ECONNREFUSED && this.handleError(err),
Expand Down
4 changes: 2 additions & 2 deletions packages/microservices/ctx-host/tcp.context.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { JsonSocket } from '../helpers/json-socket';
import { TcpSocket } from '../helpers';
import { BaseRpcContext } from './base-rpc.context';

type TcpContextArgs = [JsonSocket, string];
type TcpContextArgs = [TcpSocket, string];

export class TcpContext extends BaseRpcContext<TcpContextArgs> {
constructor(args: TcpContextArgs) {
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './json-socket';
export * from './kafka-logger';
export * from './kafka-parser';
export * from './kafka-reply-partition-assigner';
export * from './tcp-socket';
87 changes: 12 additions & 75 deletions packages/microservices/helpers/json-socket.ts
Original file line number Diff line number Diff line change
@@ -1,83 +1,29 @@
import { Socket } from 'net';
import { Buffer } from 'buffer';
import { StringDecoder } from 'string_decoder';
import {
CLOSE_EVENT,
CONNECT_EVENT,
DATA_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
} from '../constants';
import { CorruptedPacketLengthException } from '../errors/corrupted-packet-length.exception';
import { InvalidJSONFormatException } from '../errors/invalid-json-format.exception';
import { NetSocketClosedException } from '../errors/net-socket-closed.exception';
import { TcpSocket } from './tcp-socket';

export class JsonSocket {
export class JsonSocket extends TcpSocket {
private contentLength: number | null = null;
private isClosed = false;
private buffer = '';

private readonly stringDecoder = new StringDecoder();
private readonly delimeter = '#';
private readonly delimiter = '#';

public get netSocket() {
return this.socket;
}

constructor(public readonly socket: Socket) {
this.socket.on(DATA_EVENT, this.onData.bind(this));
this.socket.on(CONNECT_EVENT, () => (this.isClosed = false));
this.socket.on(CLOSE_EVENT, () => (this.isClosed = true));
this.socket.on(ERROR_EVENT, () => (this.isClosed = true));
}

public connect(port: number, host: string) {
this.socket.connect(port, host);
return this;
}

public on(event: string, callback: (err?: any) => void) {
this.socket.on(event, callback);
return this;
}

public once(event: string, callback: (err?: any) => void) {
this.socket.once(event, callback);
return this;
}

public end() {
this.socket.end();
return this;
}

public sendMessage(message: any, callback?: (err?: any) => void) {
if (this.isClosed) {
callback && callback(new NetSocketClosedException());
return;
}
protected handleSend(message: any, callback?: (err?: any) => void) {
this.socket.write(this.formatMessageData(message), 'utf-8', callback);
}

private onData(dataRaw: Buffer | string) {
protected handleData(dataRaw: Buffer | string) {
const data = Buffer.isBuffer(dataRaw)
? this.stringDecoder.write(dataRaw)
: dataRaw;

try {
this.handleData(data);
} catch (e) {
this.socket.emit(ERROR_EVENT, e.message);
this.socket.end();
}
}

private handleData(data: string) {
this.buffer += data;

if (this.contentLength == null) {
const i = this.buffer.indexOf(this.delimeter);
const i = this.buffer.indexOf(this.delimiter);
/**
* Check if the buffer has the delimeter (#),
* Check if the buffer has the delimiter (#),
* if not, the end of the buffer string might be in the middle of a content length string
*/
if (i !== -1) {
Expand All @@ -95,36 +41,27 @@ export class JsonSocket {

if (this.contentLength !== null) {
const length = this.buffer.length;

if (length === this.contentLength) {
this.handleMessage(this.buffer);
} else if (length > this.contentLength) {
const message = this.buffer.substring(0, this.contentLength);
const rest = this.buffer.substring(this.contentLength);
this.handleMessage(message);
this.onData(rest);
this.handleData(rest);
}
}
}

private handleMessage(data: string) {
private handleMessage(message: any) {
this.contentLength = null;
this.buffer = '';

let message: Record<string, unknown>;
try {
message = JSON.parse(data);
} catch (e) {
throw new InvalidJSONFormatException(e, data);
}
message = message || {};
this.socket.emit(MESSAGE_EVENT, message);
this.emitMessage(message);
}

private formatMessageData(message: any) {
const messageData = JSON.stringify(message);
const length = messageData.length;
const data = length + this.delimeter + messageData;
const data = length + this.delimiter + messageData;
return data;
}
}
78 changes: 78 additions & 0 deletions packages/microservices/helpers/tcp-socket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Buffer } from 'buffer';
import { Socket } from 'net';
import {
CLOSE_EVENT,
CONNECT_EVENT,
DATA_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
} from '../constants';
import { NetSocketClosedException } from '../errors/net-socket-closed.exception';
import { InvalidJSONFormatException } from '../errors/invalid-json-format.exception';

export abstract class TcpSocket {
private isClosed = false;

public get netSocket() {
return this.socket;
}

constructor(public readonly socket: Socket) {
this.socket.on(DATA_EVENT, this.onData.bind(this));
this.socket.on(CONNECT_EVENT, () => (this.isClosed = false));
this.socket.on(CLOSE_EVENT, () => (this.isClosed = true));
this.socket.on(ERROR_EVENT, () => (this.isClosed = true));
}

public connect(port: number, host: string) {
this.socket.connect(port, host);
return this;
}

public on(event: string, callback: (err?: any) => void) {
this.socket.on(event, callback);
return this;
}

public once(event: string, callback: (err?: any) => void) {
this.socket.once(event, callback);
return this;
}

public end() {
this.socket.end();
return this;
}

public sendMessage(message: any, callback?: (err?: any) => void) {
if (this.isClosed) {
callback && callback(new NetSocketClosedException());
return;
}
this.handleSend(message, callback);
}

protected abstract handleSend(message: any, callback?: (err?: any) => void);

private onData(data: Buffer) {
try {
this.handleData(data);
} catch (e) {
this.socket.emit(ERROR_EVENT, e.message);
this.socket.end();
}
}

protected abstract handleData(data: Buffer | string);

protected emitMessage(data: string) {
let message: Record<string, unknown>;
try {
message = JSON.parse(data);
} catch (e) {
throw new InvalidJSONFormatException(e, data);
}
message = message || {};
this.socket.emit(MESSAGE_EVENT, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Type } from '@nestjs/common';
import { ClientProxy } from '../client';
import { TcpSocket } from '../helpers';
import { Transport } from '../enums/transport.enum';
import { Deserializer } from './deserializer.interface';
import {
Expand Down Expand Up @@ -33,5 +34,6 @@ export interface TcpClientOptions {
port?: number;
serializer?: Serializer;
deserializer?: Deserializer;
socketClass?: Type<TcpSocket>;
};
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Type } from '@nestjs/common';
import { TcpSocket } from '../helpers';
import { Transport } from '../enums/transport.enum';
import { ChannelOptions } from '../external/grpc-options.interface';
import {
Expand Down Expand Up @@ -80,6 +82,7 @@ export interface TcpOptions {
retryDelay?: number;
serializer?: Serializer;
deserializer?: Deserializer;
socketClass?: Type<TcpSocket>;
};
}

Expand Down
12 changes: 8 additions & 4 deletions packages/microservices/server/server-tcp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Type } from '@nestjs/common';
import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
import * as net from 'net';
import { Server as NetSocket, Socket } from 'net';
Expand All @@ -13,7 +14,7 @@ import {
} from '../constants';
import { TcpContext } from '../ctx-host/tcp.context';
import { Transport } from '../enums';
import { JsonSocket } from '../helpers/json-socket';
import { JsonSocket, TcpSocket } from '../helpers';
import {
CustomTransportStrategy,
IncomingRequest,
Expand All @@ -29,6 +30,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {

private readonly port: number;
private readonly host: string;
private readonly socketClass: Type<TcpSocket>;
private server: NetSocket;
private isExplicitlyTerminated = false;
private retryAttemptsCount = 0;
Expand All @@ -37,6 +39,8 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
super();
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT;
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.socketClass =
this.getOptionsProp(options, 'socketClass') || JsonSocket;

this.init();
this.initializeSerializer(options);
Expand Down Expand Up @@ -68,7 +72,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
readSocket.on(ERROR_EVENT, this.handleError.bind(this));
}

public async handleMessage(socket: JsonSocket, rawMessage: unknown) {
public async handleMessage(socket: TcpSocket, rawMessage: unknown) {
const packet = await this.deserializer.deserialize(rawMessage);
const pattern = !isString(packet.pattern)
? JSON.stringify(packet.pattern)
Expand Down Expand Up @@ -124,7 +128,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
this.server.on(CLOSE_EVENT, this.handleClose.bind(this));
}

private getSocketInstance(socket: Socket): JsonSocket {
return new JsonSocket(socket);
private getSocketInstance(socket: Socket): TcpSocket {
return new this.socketClass(socket);
}
}

0 comments on commit 0a90cbe

Please sign in to comment.