Skip to content

Commit

Permalink
fix: fix client auth quirks, add handleResponse & deserializePollMess…
Browse files Browse the repository at this point in the history
…age as transform stream
  • Loading branch information
T1B0 committed Feb 21, 2024
1 parent e3d8c28 commit af46059
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 52 deletions.
7 changes: 4 additions & 3 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TcpClient } from './tcp.client.js';
import { TlsClient } from './tls.client.js';


const rawClientGetter = (config: ClientConfig): Promise<RawClient> => {
export const rawClientGetter = (config: ClientConfig): Promise<RawClient> => {
const {transport, options} = config;
switch (transport) {
case 'TCP': return TcpClient(options);
Expand All @@ -33,7 +33,7 @@ export class Client extends CommandAPI {
const getFromPool = async () => {
const c = await pool.acquire();
if(!c.isAuthenticated)
c.authenticate(config.credentials);
await c.authenticate(config.credentials);
console.log('ACQUIRED ! POOL SIZE::', pool.size);
c.once('finishQueue', () => {
pool.release(c)
Expand All @@ -45,11 +45,12 @@ export class Client extends CommandAPI {
this._config = config;
this._pool = pool;
};

}

export class SingleClient extends CommandAPI {
constructor(config: ClientConfig) {
super(() => Promise.resolve(rawClientGetter(config)));
super(() => rawClientGetter(config));
}
};

Expand Down
4 changes: 3 additions & 1 deletion src/client/client.type.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import { Readable } from 'stream';
import type { TcpOption } from './tcp.client.js';
import type { TlsOption } from './tls.client.js';

Expand All @@ -9,12 +10,13 @@ export type CommandResponse = {
};

export type RawClient = {
sendCommand: (code: number, payload: Buffer) => Promise<CommandResponse>,
sendCommand: (code: number, payload: Buffer, handleResponse?: boolean) => Promise<CommandResponse>,
destroy: () => void,
isAuthenticated: boolean
authenticate: (c: ClientCredentials) => Promise<boolean>
on: (ev: string, cb: () => void) => void
once: (ev: string, cb: () => void) => void
getReadStream: () => Readable
}

export type ClientProvider = () => Promise<RawClient>;
Expand Down
66 changes: 36 additions & 30 deletions src/client/client.utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

import { Socket } from 'node:net';
import { Duplex } from 'node:stream';
import type { ClientCredentials, CommandResponse, PasswordCredentials, TokenCredentials } from './client.type.js';
import { Duplex, PassThrough, Transform, TransformCallback } from 'node:stream';
import type {
ClientCredentials, CommandResponse, PasswordCredentials, TokenCredentials
} from './client.type.js';
import { translateCommandCode } from '../wire/command.code.js';
import { responseError } from '../wire/error.utils.js';
import { LOGIN } from '../wire/session/login.command.js';
Expand All @@ -17,6 +19,18 @@ export const handleResponse = (r: Buffer) => {
}
};

export const handleResponseTransform = () => new Transform({
transform(chunk: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
try {
const r = handleResponse(chunk);
console.log('resp:::', r)
return cb(null, r.data);
} catch (err: unknown) {
return cb(new Error('handleResponseTransform error', {cause: err}), null);
}
}
});

export const deserializeVoidResponse =
(r: CommandResponse) => r.status === 0 && r.data.length === 0;

Expand Down Expand Up @@ -99,10 +113,12 @@ export class CommandResponseStream extends Duplex {
return this._socket.write(cmd);
}

sendCommand(command: number, payload: Buffer): Promise<CommandResponse> {
sendCommand(
command: number, payload: Buffer, handleResponse = true
): Promise<CommandResponse> {
return new Promise((resolve, reject) => {
this._execQueue.push({ command, payload, resolve, reject });
this._processQueue();
this._processQueue(handleResponse);
});
}

Expand All @@ -127,7 +143,7 @@ export class CommandResponseStream extends Duplex {
return LOGIN_WITH_TOKEN.deserialize(logr);
}

async _processQueue(): Promise<void> {
async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
this.busy = true;
Expand All @@ -136,7 +152,7 @@ export class CommandResponseStream extends Duplex {
if (!next) break;
const { command, payload, resolve, reject } = next;
try {
resolve(await this._processNext(command, payload));
resolve(await this._processNext(command, payload, handleResponse));
} catch (err) {
reject(err);
}
Expand All @@ -145,13 +161,18 @@ export class CommandResponseStream extends Duplex {
this.emit('finishQueue');
}

_processNext(command: number, payload: Buffer): Promise<CommandResponse> {
_processNext(
command: number,
payload: Buffer,
handleResp = true
): Promise<CommandResponse> {
console.log('==> write', this.writeCommand(command, payload));
return new Promise((resolve, reject) => {
const errCb = (err: unknown) => reject(err);
this.once('error', errCb);
this.once('data', (resp) => {
this.removeListener('error', errCb);
if(!handleResp) return resolve(resp);
const r = handleResponse(resp);
if (r.status !== 0) {
return reject(responseError(command, r.status));
Expand All @@ -161,6 +182,10 @@ export class CommandResponseStream extends Duplex {
});
}

getReadStream() {
return this;//.pipe(new PassThrough());
}

_wrapSocket(socket: Socket) {
// pass through
socket.on('close', hadError => this.emit('close', hadError));
Expand All @@ -181,7 +206,7 @@ export class CommandResponseStream extends Duplex {
}

_onReadable() {
while (!this._readPaused) {
// while (!this._readPaused) {
const head = this._socket.read(8);
if (!head || head.length === 0) return;
if (head.length < 8) {
Expand All @@ -206,29 +231,10 @@ export class CommandResponseStream extends Duplex {

const pushOk = this.push(Buffer.concat([head, payload]));
/** consumer is slower than producer */
if (!pushOk)
this._readPaused = true;
}
// if (!pushOk)
// this._readPaused = true;
// }
}


};

// const Transports = ['TCP', 'TLS', 'QUIC'] as const;
// type TransportType = typeof Transports[number];
// type TransportOption = {};

// type TransportConfig = {
// type: TransportType,
// options: TransportOption;
// }

// type ClientConfig = {
// transport: TransportConfig
// }

// export const transportClient = (config: ClientConfig): Client => {
// const {transport} = config;
// };


12 changes: 1 addition & 11 deletions src/client/tcp.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import { createConnection, TcpSocketConnectOpts } from 'node:net';
import type { CommandResponse, RawClient } from './client.type.js';
import type { RawClient } from './client.type.js';
import { wrapSocket, CommandResponseStream } from './client.utils.js';

export const createTcpSocket =
Expand All @@ -14,13 +14,3 @@ export type TcpOption = TcpSocketConnectOpts;

export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): Promise<RawClient> =>
createTcpSocket({ host, port, keepAlive });
// return {
// sendCommand: async (code: number, payload: Buffer): Promise<CommandResponse> => {
// return socket.sendCommand(code, payload);
// },
// destroy: () => socket.destroy(),
// isAuthenticated: socket.isAuthenticated,
// authenticate: socket.authenticate
// }
// };

2 changes: 1 addition & 1 deletion src/client/tls.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import { connect, ConnectionOptions } from 'node:tls';
import type { CommandResponse, RawClient } from './client.type.js';
import type { RawClient } from './client.type.js';
import { wrapSocket, CommandResponseStream } from './client.utils.js';

export const createTlsSocket = (
Expand Down
13 changes: 13 additions & 0 deletions src/wire/message/poll.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ValueOf, reverseRecord } from '../../type.utils.js';
import { deserializeUUID, toDate } from '../serialize.utils.js';
import { serializeGetOffset, type Consumer } from '../offset/offset.utils.js';
import { deserializeHeaders, type HeadersMap } from './header.utils.js';
import { Transform, TransformCallback } from 'node:stream';

export const PollingStrategyKind = {
Offset: 1,
Expand Down Expand Up @@ -190,3 +191,15 @@ export const deserializePollMessages = (r: Buffer, pos = 0) => {
messages
}
};

export const deserializePollMessagesTransform = () => new Transform({
objectMode: true,
transform(chunk: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
console.log('chunk', typeof chunk, Buffer.isBuffer(chunk));
try {
return cb(null, deserializePollMessages(chunk));
} catch (err: unknown) {
cb(new Error('deserializePollMessage::transform error', {cause: err}), null);
}
}
})
10 changes: 4 additions & 6 deletions src/wire/offset/offset.utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import { ValueOf, reverseRecord } from '../../type.utils.js';
import { ValueOf } from '../../type.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { uint32ToBuf, uint8ToBuf } from '../number.utils.js';

export const ConsumerKind = {
Single: 1,
Expand Down Expand Up @@ -37,11 +38,8 @@ export const serializeGetOffset = (
const topicIdentifier = serializeIdentifier(topicId);
const consumerIdentifier = serializeIdentifier(consumer.id);

const b1 = Buffer.allocUnsafe(1);
b1.writeUInt8(consumer.kind);

const b2 = Buffer.allocUnsafe(4);
b2.writeUInt32LE(partitionId || 0);
const b1 = uint8ToBuf(consumer.kind);
const b2 = uint32ToBuf(partitionId || 0);

return Buffer.concat([
b1,
Expand Down

0 comments on commit af46059

Please sign in to comment.