Skip to content

Commit

Permalink
feat: add CommandResponseStream to wrap tcp socket, add parallel call…
Browse files Browse the repository at this point in the history
… safetiness
  • Loading branch information
T1B0 committed Feb 8, 2024
1 parent bb9ff9e commit 32dbd7e
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 74 deletions.
9 changes: 7 additions & 2 deletions src/client/client.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ export type CommandResponse = {
data: Buffer
};

export type Client = {
sendCommand: (code: number, payload: Buffer) => Promise<CommandResponse>
// export type ClientState = {
// isAuthenticated: boolean,
// token?: string
// };

export type RawClient = {
sendCommand: (code: number, payload: Buffer) => Promise<CommandResponse>,
}
189 changes: 149 additions & 40 deletions src/client/client.utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import { Socket } from 'node:net';
import { Duplex } from 'node:stream';
import type { CommandResponse } from './client.type.js';
import { translateCommandCode } from '../wire/command.code.js';
import { responseError } from '../wire/error.utils.js';
Expand All @@ -11,9 +12,10 @@ export const handleResponse = (r: Buffer) => {
console.log('<== handleResponse', { status, length });
return {
status, length, data: r.subarray(8)
}};
}
};

export const deserializeVoidResponse =
export const deserializeVoidResponse =
(r: CommandResponse): Boolean => r.status === 0 && r.data.length === 0;

const COMMAND_LENGTH = 4;
Expand All @@ -37,61 +39,168 @@ export const serializeCommand = (command: number, payload: Buffer) => {
}


export const wrapSocket = (socket: Socket) => new Promise<Socket>((resolve, reject) => {
socket.on('error', (err: unknown) => {
console.error('SOCKET ERROR', err)
reject(err);
});
socket.once('connect', () => {
console.log('socket.connect event !');
resolve(socket);
export const wrapSocket = (socket: Socket) =>
new Promise<CommandResponseStream>((resolve, reject) => {
const responseStream = new CommandResponseStream(socket);

socket.on('error', (err: unknown) => {
console.error('RESPONSESTREAM ERROR', err)
reject(err);
});
socket.once('connect', () => {
console.log('responseStream.connect event !');
resolve(responseStream);
});
socket.on('close', () => { console.error('#CLOSE'); reject(); });
socket.on('end', () => { console.error('#END'); reject(); });
});
socket.on('close', (e) => { console.error('#CLOSE', e); reject(e); });
socket.on('end', () => { console.error('#END'); reject(); });
});


export const sendCommandWithResponse = (s: Socket) =>
(command: number, payload: Buffer): Promise<CommandResponse> => {
type WriteCb = ((error: Error | null | undefined) => void) | undefined

type Job = {
command: number,
payload: Buffer,
resolve: (v: any) => void,
reject: (e: any) => void
};

export class CommandResponseStream extends Duplex {
private _socket: Socket;
private _readPaused: boolean;
private _execQueue: Job[];
public busy: boolean;

constructor(socket: Socket) {
super();
this._socket = this._wrapSocket(socket);
this._readPaused = false;
this.busy = false;
this._execQueue = [];
}

_read(size: number): void {
this._readPaused = false;
setImmediate(this._onReadable.bind(this));
}

_write(chunk: Buffer, encoding: BufferEncoding | undefined, cb?: WriteCb) {
return this._socket.write(chunk, encoding, cb);
};

writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
console.log(
'==> sending cmd', command, cmd.toString('hex')
);
console.log('==> socket write', s.write(cmd));
return this._socket.write(cmd);
}

sendCommand(command: number, payload: Buffer): Promise<CommandResponse> {
return new Promise((resolve, reject) => {
const dataCb = (d: Buffer, l: number) => {
console.log('<== #DATA', d, l);
const r = handleResponse(d);
s.removeListener('error', errCb);
this._execQueue.push({ command, payload, resolve, reject });
this._processQueue();
});
}

async _processQueue(): Promise<void> {
if (this.busy)
return;
this.busy = true;
while (this._execQueue.length > 0) {
const next = this._execQueue.shift();
if (!next) break;
const { command, payload, resolve, reject } = next;
try {
resolve(await this._processNext(command, payload));
} catch (err) {
reject(err);
}
}
this.busy = false;
}

_processNext(command: number, payload: Buffer): Promise<CommandResponse> {
console.log('==> write', this.writeCommand(command, payload));
return new Promise((resolve, reject) => {
const errCb = (err: unknown) => reject(err);
this.once('error', errCb);
this.once('data', (resp) => {
this.removeListener('error', errCb);
const r = handleResponse(resp);
if (r.status !== 0) {
return reject(responseError(command, r.status));
}
return resolve(r);
};
const errCb = (err: unknown) => {
s.removeListener('data', dataCb);
reject(err);
};
s.once('data', dataCb)
s.once('error', errCb);
});
});
};


// enum TransportType {
// TCP = 'tcp',
// // TLS = 'tls',
// // QUIC = 'quic'
}

_wrapSocket(socket: Socket) {
// pass through
socket.on('close', hadError => this.emit('close', hadError));
socket.on('connect', () => this.emit('connect'));
socket.on('drain', () => this.emit('drain'));
socket.on('end', () => this.emit('end'));
socket.on('error', err => this.emit('error', err));
socket.on(
'lookup',
(err, address, family, host) => this.emit('lookup', err, address, family, host)
);
socket.on('ready', () => this.emit('ready'));
socket.on('timeout', () => this.emit('timeout'));

// customize data events
socket.on('readable', () => this._onReadable());
return socket;
}

_onReadable() {
while (!this._readPaused) {
const head = this._socket.read(8);
if (!head || head.length === 0) return;
if (head.length < 8) {
this._socket.unshift(head);
return;
}
/** first chunk[4:8] hold response length */
const responseSize = head.readUInt32LE(4);
/** response has no payload (create/update/delete ops...) */
if (responseSize === 0) {
this.push(head);
return;
}

const payload = this._socket.read(responseSize);
if (!payload) this._socket.unshift(head);
/** payload is incomplete, unshift until next read */
if (payload.length < responseSize) {
this._socket.unshift(Buffer.concat([head, payload]));
return;
}

const pushOk = this.push(Buffer.concat([head, payload]));
/** consumer is slower than producer */
if (!pushOk)
this._readPaused = true;
}
}


};

// const Transports = ['TCP', 'TLS', 'QUIC'] as const;
// type TransportType = typeof Transports[number];
// type TransportOption = {};

// type TransportConfig = {
// type: TransportType,
// options: TransportOption;
// }

// type ClientConfig = {
// transport: TransportType
// host: string,
// port: number
// transport: TransportConfig
// }

// export const transportClient = (config: ClientConfig): Client => {
// const {transport} = config;
// };


32 changes: 11 additions & 21 deletions src/client/tcp.client.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,25 @@

import { createConnection, Socket, TcpSocketConnectOpts } from 'node:net';
import type { CommandResponse, Client } from './client.type.js';
import { wrapSocket, sendCommandWithResponse } from './client.utils.js';
import { createConnection, TcpSocketConnectOpts } from 'node:net';
import type { CommandResponse, RawClient } from './client.type.js';
import { wrapSocket, CommandResponseStream } from './client.utils.js';


export const createTcpSocket = (options: TcpSocketConnectOpts): Promise<Socket> => {
const socket = createConnection(options);
return wrapSocket(socket);
};
export const createTcpSocket =
(options: TcpSocketConnectOpts): Promise<CommandResponseStream> => {
const socket = createConnection(options);
return wrapSocket(socket);
};


export type TcpOption = TcpSocketConnectOpts;

export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): Client => {
let socket: Socket;
export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): RawClient => {
let socket: CommandResponseStream;
return {
sendCommand: async (code: number, payload: Buffer):Promise<CommandResponse> => {
if (!socket)
socket = await createTcpSocket({host, port, keepAlive});
return sendCommandWithResponse(socket)(code, payload);
return socket.sendCommand(code, payload);
}
}
};

// export const createTransport = (options: TcpOption): Client => {
// let socket: Socket;
// return {
// sendCommand: async (code: number, payload: Buffer):Promise<CommandResponse> => {
// if (!socket)
// socket = await createTcpSocket(options);
// return sendCommandWithResponse(socket)(code, payload);
// }
// }
// };
41 changes: 41 additions & 0 deletions src/panic.parallel.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

import { TcpClient } from './client/tcp.client.js';
import { login } from './wire/session/login.command.js';
import { logout } from './wire/session/logout.command.js';
import { getStreams } from './wire/stream/get-streams.command.js';
import { getUsers } from './wire/user/get-users.command.js';

try {
// create socket
const s = TcpClient({ host: '127.0.0.1', port: 8090 });

// LOGIN
const r = await login(s)({ username: 'iggy', password: 'iggy' });
console.log('RESPONSE_login', r);

const resp = await Promise.all([
getUsers(s)(),
getStreams(s)(),
getUsers(s)(),
getStreams(s)(),
getUsers(s)(),
getStreams(s)(),
getUsers(s)(),
getStreams(s)(),
])

console.log('RESP', resp);

console.log('GETUSERS', await getUsers(s)());
console.log('GETSTREAM', await getStreams(s)());

// LOGOUT
const rOut = await logout(s)();
console.log('RESPONSE LOGOUT', rOut);


} catch (err) {
console.error('FAILED!', err);
}

process.exit(0);
37 changes: 37 additions & 0 deletions src/panic.serial.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

import { createTcpSocket } from './client/tcp.client.js';
import { handleResponse } from './client/client.utils.js';
import { LOGIN } from './wire/session/login.command.js';
import { logout } from './wire/session/logout.command.js';
import { GET_STREAMS } from './wire/stream/get-streams.command.js';
import { GET_USERS } from './wire/user/get-users.command.js';

try {
// create socket
const s = await createTcpSocket({ host: '127.0.0.1', port: 8090 });

// LOGIN
const log = LOGIN.serialize({ username: 'iggy', password: 'iggy' });
const logr = await s.sendCommand(LOGIN.code, log);
const r = LOGIN.deserialize(logr);
console.log('RESPONSE_login', r);

s.on('data', (d) => console.log('=>>DATA!!', d, d.length, handleResponse(d)));
s.on('error', (err) => console.error('=>>SOCKET ERROR!!', err));

// TLDR: this is not officialy supported (but somehow works here)
console.log('==> socket write CMD1',
s.writeCommand(GET_USERS.code, GET_USERS.serialize()));
console.log('==> socket write CMD2',
s.writeCommand(GET_STREAMS.code, GET_STREAMS.serialize()));

// LOGOUT
const rOut = await logout(s)();
console.log('RESPONSE LOGOUT', rOut);


} catch (err) {
console.error('FAILED!', err);
}

process.exit(0);
Loading

0 comments on commit 32dbd7e

Please sign in to comment.