Skip to content

Commit

Permalink
refactor: extract socket wrapping to its own files
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jul 25, 2024
1 parent c77fabf commit 596367b
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 204 deletions.
4 changes: 4 additions & 0 deletions src/client/client.debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

import Debug from 'debug';

export const debug = Debug('iggy:client');
4 changes: 1 addition & 3 deletions src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@

import Debug from 'debug';
import type { RawClient, ClientConfig } from "./client.type.js"
import { createPool, type Pool } from 'generic-pool';
import { CommandAPI } from '../wire/command-set.js';
import { TcpClient } from './tcp.client.js';
import { TlsClient } from './tls.client.js';
import { debug } from './client.debug.js';


const debug = Debug('iggy:client');

export const rawClientGetter = (config: ClientConfig): Promise<RawClient> => {
const { transport, options } = config;
switch (transport) {
Expand Down
202 changes: 3 additions & 199 deletions src/client/client.utils.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@

import { Socket } from 'node:net';
import { Duplex, Transform, TransformCallback } from 'node:stream';
import Debug from 'debug';
import type {
ClientCredentials, CommandResponse, PasswordCredentials, TokenCredentials
} from './client.type.js';
import { Transform, TransformCallback } from 'node:stream';
import type { CommandResponse } from './client.type.js';
import { translateCommandCode } from '../wire/command.code.js';
import { responseError } from '../wire/error.utils.js';
import { LOGIN } from '../wire/session/login.command.js';
import { LOGIN_WITH_TOKEN } from '../wire/session/login-with-token.command.js';
import { debug } from './client.debug.js';


const debug = Debug('iggy:client');

export const handleResponse = (r: Buffer) => {
const status = r.readUint32LE(0);
const length = r.readUint32LE(4);
Expand Down Expand Up @@ -58,191 +50,3 @@ export const serializeCommand = (command: number, payload: Buffer) => {
}


export const wrapSocket = (socket: Socket) =>
new Promise<CommandResponseStream>((resolve, reject) => {
const responseStream = new CommandResponseStream(socket);

socket.on('error', (err: unknown) => {
console.error('RESPONSESTREAM ERROR', err)
reject(err);
});
socket.once('connect', () => {
debug('responseStream.connect event');
resolve(responseStream);
});
socket.on('close', () => { console.error('socket#close'); reject(); });
socket.on('end', () => { console.error('socket#end'); reject(); });
});


type WriteCb = ((error: Error | null | undefined) => void) | undefined

type Job = {
command: number,
payload: Buffer,
resolve: (v: any) => void,
reject: (e: any) => void
};

export class CommandResponseStream extends Duplex {
private _socket: Socket;
private _readPaused: boolean;
private _execQueue: Job[];
public busy: boolean;
isAuthenticated: boolean;
userId?: number;


constructor(socket: Socket) {
super();
this._socket = this._wrapSocket(socket);
this._readPaused = false;
this.busy = false;
this._execQueue = [];
this.isAuthenticated = false;
};

_read(size: number): void {
this._readPaused = false;
debug('stream#_read', size);
setImmediate(this._onReadable.bind(this));
}

_write(chunk: Buffer, encoding: BufferEncoding | undefined, cb?: WriteCb) {
return this._socket.write(chunk, encoding, cb);
};

writeCommand(command: number, payload: Buffer): boolean {
const cmd = serializeCommand(command, payload);
return this._socket.write(cmd);
}

sendCommand(
command: number, payload: Buffer, handleResponse = true
): Promise<CommandResponse> {
return new Promise((resolve, reject) => {
this._execQueue.push({ command, payload, resolve, reject });
this._processQueue(handleResponse);
});
}

async authenticate(creds: ClientCredentials) {
const r = ('token' in creds) ?
await this._authWithToken(creds) :
await this._authWithPassword(creds);
this.isAuthenticated = true;
this.userId = r.userId;
return this.isAuthenticated;
}

async _authWithPassword(creds: PasswordCredentials) {
const pl = LOGIN.serialize(creds);
const logr = await this.sendCommand(LOGIN.code, pl);
return LOGIN.deserialize(logr);
}

async _authWithToken(creds: TokenCredentials) {
const pl = LOGIN_WITH_TOKEN.serialize(creds);
const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl);
return LOGIN_WITH_TOKEN.deserialize(logr);
}

async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
this.busy = true;
while (this._execQueue.length > 0) {
const next = this._execQueue.shift();
if (!next) break;
const { command, payload, resolve, reject } = next;
try {
resolve(await this._processNext(command, payload, handleResponse));
} catch (err) {
reject(err);
}
}
this.busy = false;
this.emit('finishQueue');
}

_processNext(
command: number,
payload: Buffer,
handleResp = true
): Promise<CommandResponse> {
debug('==> writeCommand', this.writeCommand(command, payload));
return new Promise((resolve, reject) => {
const errCb = (err: unknown) => reject(err);
this.once('error', errCb);
this.once('data', (resp) => {
this.removeListener('error', errCb);
if (!handleResp) return resolve(resp);
const r = handleResponse(resp);
if (r.status !== 0) {
return reject(responseError(command, r.status));
}
return resolve(r);
});
});
}

getReadStream() {
return this;//.pipe(new PassThrough());
}

_wrapSocket(socket: Socket) {
// pass through
socket.on('close', hadError => this.emit('close', hadError));
socket.on('connect', () => this.emit('connect'));
socket.on('drain', () => this.emit('drain'));
socket.on('end', () => this.emit('end'));
socket.on('error', err => this.emit('error', err));
socket.on(
'lookup',
(err, address, family, host) => this.emit('lookup', err, address, family, host)
);
socket.on('ready', () => this.emit('ready'));
socket.on('timeout', () => this.emit('timeout'));

// customize data events
socket.on('readable', () => this._onReadable());
return socket;
}

_onReadable() {
while (!this._readPaused) {
const head = this._socket.read(8);
if (!head || head.length === 0) return;
if (head.length < 8) {
this._socket.unshift(head);
return;
}
/** first chunk[4:8] hold response length */
const responseSize = head.readUInt32LE(4);
/** response has no payload (create/update/delete ops...) */
if (responseSize === 0) {
this.push(head);
return;
}

const payload = this._socket.read(responseSize);
debug('payload', payload, responseSize, head.readUInt32LE(0));
if (!payload) {
this._socket.unshift(head);
return;
}
/** payload is incomplete, unshift until next read */
if (payload.length < responseSize) {
this._socket.unshift(Buffer.concat([head, payload]));
return;
}

const pushOk = this.push(Buffer.concat([head, payload]));
/** consumer is slower than producer */
if (!pushOk)
this._readPaused = true;
}
}


};
1 change: 1 addition & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { Client, SimpleClient, SingleClient } from './client.js'
export { TcpClient, type TcpOption } from './tcp.client.js';
export { TlsClient, type TlsOption } from './tls.client.js';
export * from './client.utils.js';
export * from './client.socket.js';
2 changes: 1 addition & 1 deletion src/client/tcp.client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import { createConnection, TcpSocketConnectOpts } from 'node:net';
import type { RawClient } from './client.type.js';
import { wrapSocket, CommandResponseStream } from './client.utils.js';
import { wrapSocket, CommandResponseStream } from './client.socket.js';

export const createTcpSocket =
(options: TcpSocketConnectOpts): Promise<CommandResponseStream> => {
Expand Down
2 changes: 1 addition & 1 deletion src/client/tls.client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

import { connect, ConnectionOptions } from 'node:tls';
import type { RawClient } from './client.type.js';
import { wrapSocket, CommandResponseStream } from './client.utils.js';
import { wrapSocket, CommandResponseStream } from './client.socket.js';

export const createTlsSocket = (
port: number, options: ConnectionOptions
Expand Down

0 comments on commit 596367b

Please sign in to comment.