Skip to content

Commit

Permalink
fix: fix message headers serialization bug
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jan 31, 2024
1 parent 5fa2965 commit fab07bd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
54 changes: 36 additions & 18 deletions src/tcp.send-message.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { TcpClient } from './client/tcp.client.js';

import { login } from './wire/session/login.command.js';
import { logout } from './wire/session/logout.command.js';
import { sendMessages } from './wire/message/send-messages.command.js';
import { sendMessages, type SendMessages } from './wire/message/send-messages.command.js';
import { pollMessages } from './wire/message/poll-messages.command.js';
import { createTopic } from './wire/topic/create-topic.command.js';
import { deleteTopic } from './wire/topic/delete-topic.command.js';
Expand All @@ -14,6 +14,7 @@ import { deleteStream } from './wire/stream/delete-stream.command.js';
import { purgeStream } from './wire/stream/purge-stream.command.js';
import { getOffset } from './wire/offset/get-offset.command.js';
import { storeOffset } from './wire/offset/store-offset.command.js';
import { uint8ToBuf } from './wire/number.utils.js';


try {
Expand All @@ -34,8 +35,8 @@ try {
};

// CREATE_STREAM
const r_createStream = await createStream(s)(stream);
console.log('RESPONSE_createStream', r_createStream);
// const r_createStream = await createStream(s)(stream);
// console.log('RESPONSE_createStream', r_createStream);

const topic1 = {
streamId,
Expand All @@ -48,13 +49,25 @@ try {
};

// CREATE_TOPIC
const r_createTopic = await createTopic(s)(topic1);
console.log('RESPONSE_createTopic', r_createTopic);
// const r_createTopic = await createTopic(s)(topic1);
// console.log('RESPONSE_createTopic', r_createTopic);

const msg = {
const h0 = { 'foo': { kind: 9, value: 1 } };
const h1 = { 'x-header-string-1': { kind: 2, value: 'incredible' } };
const h2 = { 'x-header-bool': { kind: 3, value: false } };

const msg: SendMessages = {
streamId,
topicId,
messages: [{ id: v7(), payload: 'yolo msg' }]
messages: [
{ id: v7(), payload: 'content', headers: h0 },
{ id: v7(), payload: 'content' },
{ id: v7(), payload: 'yolo msg' },
{ id: v7(), payload: 'yolo msg 2' },
{ id: v7(), payload: 'this is fuu', headers: h1},
{ id: v7(), payload: 'this is bar', headers: h2},
],
partition: {kind: 2, value: 1}
};

// SEND MESSAGES
Expand All @@ -69,12 +82,17 @@ try {
consumer: { kind: 1, id: 1 },
partitionId,
pollingStrategy: pollStrat,
count: 2,
count: 10,
autocommit: false
};

const rPoll = await pollMessages(s)(pollReq);
const {messages, ...resp} = rPoll;
const m = messages.map(
m => ({ id: m.id, headers: m.headers, payload: m.payload.toString() })
);
console.log('RESPONSE POLL_MESSAGE', rPoll);
console.log('RESPONSE POLL_MESSAGE', resp, JSON.stringify(m, null, 2));

// GET OFFSET
const rOff = await getOffset(s)({
Expand All @@ -92,19 +110,19 @@ try {
const r_purgeTopic = await purgeTopic(s)({ streamId, topicId });
console.log('RESPONSE_purgeTopic', r_purgeTopic);

// PURGE STREAM
const r_purgeStream = await purgeStream(s)({ streamId });
console.log('RESPONSE_purgeStream', r_purgeStream);
// // PURGE STREAM
// const r_purgeStream = await purgeStream(s)({ streamId });
// console.log('RESPONSE_purgeStream', r_purgeStream);

// DELETE TOPIC
const r_deleteTopic = await deleteTopic(s)({
streamId, topicId, partitionsCount: topic1.partitionCount
});
console.log('RESPONSE_deleteTopic', r_deleteTopic);
// // DELETE TOPIC
// const r_deleteTopic = await deleteTopic(s)({
// streamId, topicId, partitionsCount: topic1.partitionCount
// });
// console.log('RESPONSE_deleteTopic', r_deleteTopic);

// DELETE STREAM
const rDelS = await deleteStream(s)({ streamId: stream.streamId });
console.log('RESPONSEDelS', rDelS);
// const rDelS = await deleteStream(s)({ streamId: stream.streamId });
// console.log('RESPONSEDelS', rDelS);

// LOGOUT
const rOut = await logout(s)();
Expand Down
11 changes: 5 additions & 6 deletions src/wire/message/header.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@ export const serializeHeaderValue = (header: HeaderValue) => {

export const serializeHeader = (key: string, v: BinaryHeaderValue) => {
const bKey = Buffer.from(key)

const b1 = Buffer.alloc(4);
b1.writeUInt32LE(bKey.length);
const b1 = uint32ToBuf(bKey.length);

const b2 = Buffer.alloc(5);
b2.writeUInt8(v.kind);
b2.writeUInt32LE(v.value.length);

b2.writeUInt32LE(v.value.length, 1);
return Buffer.concat([
b1,
bKey,
Expand All @@ -108,11 +106,12 @@ const createHeaderValue = (header: HeaderValue): BinaryHeaderValue => ({
export const serializeHeaders = (headers?: Headers) => {
if (!headers)
return EMPTY_HEADERS;
return Object.keys(headers).reduce(
const b = Object.keys(headers).reduce(
(ac: Buffer, c: string) => Buffer.concat([
ac, serializeHeader(c, createHeaderValue(headers[c]))]),
Buffer.alloc(0)
);
return Buffer.concat([uint32ToBuf(b.length), b]);
};

// deserialize ...
Expand Down
2 changes: 1 addition & 1 deletion src/wire/message/message.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const serializeMessage = (msg: CreateMessage) => {

return Buffer.concat([
bId,
bHeaders,
bHeaders, // size included
uint32ToBuf(bPayload.length),
bPayload
]);
Expand Down

0 comments on commit fab07bd

Please sign in to comment.