Skip to content

Commit

Permalink
feat: add getOffset and storeOffset command, fix typos
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jan 22, 2024
1 parent 8f4e71a commit a642468
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/tcp.client.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ try {

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/tcp.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ try {

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
28 changes: 23 additions & 5 deletions src/tcp.send-message.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { LOGIN } from './wire/session/login.command.js';
import { SEND_MESSAGE } from './wire/message/send-message.command.js';
import { CREATE_TOPIC } from './wire/topic/create-topic.command.js';
import { CREATE_STREAM } from './wire/stream/create-stream.command.js';
import { GET_OFFSET } from './wire/offset/get-offset.command.js';
import { STORE_OFFSET } from './wire/offset/store-offset.command.js';
import { LOGOUT } from './wire/session/logout.command.js';


Expand All @@ -20,33 +22,49 @@ try {
const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd);
console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r));


const streamId = 101;
const topicId = 'test-topic-sm';
const partitionId = 1;

// // CREATE_STREAM
// const createStreamCmd = CREATE_STREAM.serialize(101, 'test-send-message');
// const createStreamCmd = CREATE_STREAM.serialize(streamId, 'test-send-message');
// const r_createStream = await sendCommandWithResponse(s)(
// CREATE_STREAM.code, createStreamCmd
// );
// console.log('RESPONSE_createStream', CREATE_STREAM.deserialize(r_createStream));

// // CREATE_TOPIC
// const ctp = CREATE_TOPIC.serialize(
// 101, 1, 'test-topic-sm', 3, 0, 0, 1
// streamId, 1, topicId, 3, 0, 0, 1
// );
// const r_createTopic = await sendCommandWithResponse(s)(CREATE_TOPIC.code, ctp);
// console.log('RESPONSE_createTopic', CREATE_TOPIC.deserialize(r_createTopic));

// SEND MESSAGE
const cmdSm = SEND_MESSAGE.serialize(
101, 'test-topic-sm',
streamId, topicId,
[{ id: v7(), payload: 'yolo msg' }]
);

const r1 = await sendCommandWithResponse(s)(SEND_MESSAGE.code, cmdSm);
console.log('RESPONSE SEND_MESSAGE', SEND_MESSAGE.deserialize(r1));

// GET OFFSET
const gof = GET_OFFSET.serialize(streamId, topicId, { kind: 1, id: 1 }, partitionId);
const rOff = await sendCommandWithResponse(s)(GET_OFFSET.code, gof);
console.log('RESPONSE GET_OFFSET', GET_OFFSET.deserialize(rOff));

// STORE OFFSET
const sof = STORE_OFFSET.serialize(
streamId, topicId, { kind: 1, id: 1 }, partitionId, 1n
);
const rsOff = await sendCommandWithResponse(s)(STORE_OFFSET.code, sof);
console.log('RESPONSE STORE_OFFSET', STORE_OFFSET.deserialize(rsOff));


// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions src/tcp.system.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ try {

// PING
const r2 = await sendCommandWithResponse(s)(PING.code, PING.serialize());
console.log('RESPONSE PING', r2, PING.desserialize(r2));
console.log('RESPONSE PING', r2, PING.deserialize(r2));

// LOGIN
const loginCmd = LOGIN.serialize('iggy', 'iggy');
Expand All @@ -28,7 +28,7 @@ try {

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/tcp.token.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ try {

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion src/tcp.user.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ try {

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));


} catch (err) {
Expand Down
15 changes: 2 additions & 13 deletions src/wire/message/message.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ export type CreateMessage = {
payload: string | Buffer
};

// type Message = {
// id: string, // uuid
// headers: MessageHeaders
// payload: string | Buffer,
// }

// type SendMessage = {
// streamId: Id,
// topicId: Id,
// partitioning: Partitioning,
// messages: CreateMessage[]
// }

export const serializeUUID = (id: string) => Buffer.from(id.replaceAll('-', ''), 'hex');

export const serializeMessage = (msg: CreateMessage) => {
Expand Down Expand Up @@ -60,6 +47,8 @@ export const serializeSendMessages = (
const bPartitioning = serializePartitioning(partitioning);
const bMessages = serializeMessages(messages);

console.log('SM::streamId', streamId, streamIdentifier.toString('hex'));
console.log('SM::topicId', topicId, topicIdentifier.toString('hex'));
console.log('SM::partitioning', partitioning, bPartitioning.toString('hex'));

return Buffer.concat([
Expand Down
29 changes: 29 additions & 0 deletions src/wire/offset/get-offset.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

import type { CommandResponse } from '../../tcp.client.js';
import type { Id } from '../identifier.utils.js';
import { serializeGetOffset, type Consumer, type OffsetResponse } from './offset.utils.js';

export const GET_OFFSET = {
code: 120,

serialize: (
streamId: Id,
topicId: Id,
consumer: Consumer,
partitionId?: number
) => {
return serializeGetOffset(streamId, topicId, consumer, partitionId);
},

deserialize: (r: CommandResponse): OffsetResponse => {
const partitionId = r.data.readUInt32LE(0);
const currentOffset = r.data.readBigUInt64LE(4);
const storedOffset = r.data.readBigUInt64LE(12);

return {
partitionId,
currentOffset,
storedOffset
}
}
};
64 changes: 64 additions & 0 deletions src/wire/offset/offset.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

import { serializeIdentifier, type Id } from '../identifier.utils.js';

export enum ConsumerKind {
Single = 1,
Group = 2
}

export type Consumer = {
kind: ConsumerKind,
id: Id
}

export type OffsetResponse = {
partitionId: number,
currentOffset: bigint,
storedOffset: bigint
};

export const serializeGetOffset = (
streamId: Id,
topicId: Id,
consumer: Consumer,
partitionId?: number
) => {

if (consumer.kind === ConsumerKind.Single && (!partitionId || partitionId < 1))
throw new Error('getOffset error: partitionId must be > 0 for single consumer kind');

const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const consumerIdentifier = serializeIdentifier(consumer.id);

const b1 = Buffer.alloc(1);
b1.writeUInt8(consumer.kind);

const b2 = Buffer.alloc(4);
b2.writeUInt32LE(partitionId || 0);

return Buffer.concat([
b1,
consumerIdentifier,
streamIdentifier,
topicIdentifier,
b2
]);
};

export const serializeStoreOffset = (
streamId: Id,
topicId: Id,
consumer: Consumer,
partitionId: number,
offset: bigint
) => {
const b = Buffer.alloc(8);
b.writeBigUInt64LE(offset, 0);

return Buffer.concat([
serializeGetOffset(streamId, topicId, consumer, partitionId),
b
]);
}

24 changes: 24 additions & 0 deletions src/wire/offset/store-offset.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

import type { CommandResponse } from '../../tcp.client.js';
import { type Id } from '../identifier.utils.js';
import { serializeStoreOffset, type Consumer } from './offset.utils.js';

export const STORE_OFFSET = {
code: 121,

serialize: (
streamId: Id,
topicId: Id,
consumer: Consumer,
partitionId: number,
offset: bigint
) => {
return serializeStoreOffset(
streamId, topicId, consumer, partitionId, offset
);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}
};
2 changes: 1 addition & 1 deletion src/wire/session/logout.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const LOGOUT = {
return Buffer.alloc(0);
},

desserialize: (r: CommandResponse) => {
deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/wire/system/ping.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const PING = {
return Buffer.alloc(0);
},

desserialize: (r: CommandResponse) => {
deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}

Expand Down
5 changes: 2 additions & 3 deletions src/wire/topic/get-topic.command.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

import type { CommandResponse } from '../../tcp.client.js';
import { serializeIdentifier } from '../identifier.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeTopic } from './topic.utils.js';

type id = number | string;

export const GET_TOPIC = {
code: 300,

serialize: (streamId: id, topicId: id) => {
serialize: (streamId: Id, topicId: Id) => {
return Buffer.concat([
serializeIdentifier(streamId),
serializeIdentifier(topicId)
Expand Down

0 comments on commit a642468

Please sign in to comment.