Skip to content

Commit

Permalink
fix: fix Partitioning.MessageKey type, fix indent
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Mar 15, 2024
1 parent 79280c7 commit f2e6ef7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
27 changes: 16 additions & 11 deletions src/client/client.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export const handleResponseTransform = () => new Transform({
console.log('resp:::', r)
return cb(null, r.data);
} catch (err: unknown) {
return cb(new Error('handleResponseTransform error', {cause: err}), null);
return cb(new Error('handleResponseTransform error', { cause: err }), null);
}
}
});
Expand Down Expand Up @@ -88,7 +88,7 @@ export class CommandResponseStream extends Duplex {
public busy: boolean;
isAuthenticated: boolean;
userId?: number;


constructor(socket: Socket) {
super();
Expand All @@ -97,10 +97,11 @@ export class CommandResponseStream extends Duplex {
this.busy = false;
this._execQueue = [];
this.isAuthenticated = false;
}
};

_read(size: number): void {
this._readPaused = false;
console.log('_read', size);
setImmediate(this._onReadable.bind(this));
}

Expand Down Expand Up @@ -140,9 +141,9 @@ export class CommandResponseStream extends Duplex {
async _authWithToken(creds: TokenCredentials) {
const pl = LOGIN_WITH_TOKEN.serialize(creds);
const logr = await this.sendCommand(LOGIN_WITH_TOKEN.code, pl);
return LOGIN_WITH_TOKEN.deserialize(logr);
return LOGIN_WITH_TOKEN.deserialize(logr);
}

async _processQueue(handleResponse = true): Promise<void> {
if (this.busy)
return;
Expand Down Expand Up @@ -172,7 +173,7 @@ export class CommandResponseStream extends Duplex {
this.once('error', errCb);
this.once('data', (resp) => {
this.removeListener('error', errCb);
if(!handleResp) return resolve(resp);
if (!handleResp) return resolve(resp);
const r = handleResponse(resp);
if (r.status !== 0) {
return reject(responseError(command, r.status));
Expand Down Expand Up @@ -206,7 +207,7 @@ export class CommandResponseStream extends Duplex {
}

_onReadable() {
// while (!this._readPaused) {
while (!this._readPaused) {
const head = this._socket.read(8);
if (!head || head.length === 0) return;
if (head.length < 8) {
Expand All @@ -222,7 +223,11 @@ export class CommandResponseStream extends Duplex {
}

const payload = this._socket.read(responseSize);
if (!payload) this._socket.unshift(head);
console.log('payload', payload, responseSize, head.readUInt32LE(0));
if (!payload) {
this._socket.unshift(head);
return;
}
/** payload is incomplete, unshift until next read */
if (payload.length < responseSize) {
this._socket.unshift(Buffer.concat([head, payload]));
Expand All @@ -231,9 +236,9 @@ export class CommandResponseStream extends Duplex {

const pushOk = this.push(Buffer.concat([head, payload]));
/** consumer is slower than producer */
// if (!pushOk)
// this._readPaused = true;
// }
if (!pushOk)
this._readPaused = true;
}
}


Expand Down
2 changes: 1 addition & 1 deletion src/client/tcp.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ export const createTcpSocket =
export type TcpOption = TcpSocketConnectOpts;

export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): Promise<RawClient> =>
createTcpSocket({ host, port, keepAlive });
createTcpSocket({ host, port, keepAlive });
6 changes: 3 additions & 3 deletions src/wire/message/partitioning.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ const PartitionId = (id: number): PartitionId => ({
value: id
});

const MessageKey = (id: number): MessageKey => ({
const MessageKey = (key: MessageKeyValue): MessageKey => ({
kind: PartitionKind.MessageKey,
value: id
value: key
});

// Helper
Expand All @@ -61,7 +61,7 @@ export const serializeMessageKey = (v: MessageKeyValue) => {
if ('string' === typeof v) return Buffer.from(v);
if ('number' === typeof v) return uint32ToBuf(v);
if ('bigint' === typeof v) return uint64ToBuf(v);
throw new Error(`cannot serialize messageKey ${v}`);
throw new Error(`cannot serialize messageKey ${v}, ${typeof v}`);
};

export const serializePartitioningValue = (part: Partitioning): Buffer => {
Expand Down

0 comments on commit f2e6ef7

Please sign in to comment.