Skip to content

Commit

Permalink
feat: add create, delete, join & leave consumer-group command
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jan 22, 2024
1 parent dd5e276 commit 237f0e7
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 7 deletions.
30 changes: 30 additions & 0 deletions src/wire/consumer-group/create-group.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

import type { CommandResponse } from '../../tcp.client.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';

export const CREATE_GROUP = {
code: 602,

serialize: (
streamId: Id,
topicId: Id,
groupId: number,
name: string,
) => {
const bName = Buffer.from(name);
const b = Buffer.allocUnsafe(5);
b.writeUInt32LE(groupId);
b.writeUInt8(bName.length);

return Buffer.concat([
serializeIdentifier(streamId),
serializeIdentifier(topicId),
b,
bName
]);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}
};
16 changes: 16 additions & 0 deletions src/wire/consumer-group/delete-group.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import type { CommandResponse } from '../../tcp.client.js';
import { type Id } from '../identifier.utils.js';
import { serializeTargetGroup } from './group.utils.js';

export const DELETE_GROUP = {
code: 603,

serialize: (streamId: Id, topicId: Id, groupId: Id) => {
return serializeTargetGroup(streamId, topicId, groupId);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}
};
10 changes: 3 additions & 7 deletions src/wire/consumer-group/get-group.command.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@

import type { CommandResponse } from '../../tcp.client.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeConsumerGroup } from './group.utils.js';
import { type Id } from '../identifier.utils.js';
import { serializeTargetGroup, deserializeConsumerGroup } from './group.utils.js';

export const GET_GROUP = {
code: 600,

serialize: (streamId: Id, topicId: Id, groupId: Id) => {
return Buffer.concat([
serializeIdentifier(streamId),
serializeIdentifier(topicId),
serializeIdentifier(groupId)
]);
return serializeTargetGroup(streamId, topicId, groupId);
},

deserialize: (r: CommandResponse) => {
Expand Down
11 changes: 11 additions & 0 deletions src/wire/consumer-group/group.utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

import { serializeIdentifier, type Id } from '../identifier.utils.js';

export type ConsumerGroup = {
id: number,
name: string,
Expand All @@ -11,6 +13,15 @@ type ConsumerGroupDeserialized = {
data: ConsumerGroup
}

export const serializeTargetGroup = (streamId: Id, topicId: Id, groupId: Id) => {
return Buffer.concat([
serializeIdentifier(streamId),
serializeIdentifier(topicId),
serializeIdentifier(groupId)
]);
};


export const deserializeConsumerGroup = (r: Buffer, pos = 0): ConsumerGroupDeserialized => {
const id = r.readUInt32LE(pos);
const membersCount = r.readUInt32LE(pos + 4);
Expand Down
16 changes: 16 additions & 0 deletions src/wire/consumer-group/join-group.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import type { CommandResponse } from '../../tcp.client.js';
import { type Id } from '../identifier.utils.js';
import { serializeTargetGroup } from './group.utils.js';

export const JOIN_GROUP = {
code: 604,

serialize: (streamId: Id, topicId: Id, groupId: Id) => {
return serializeTargetGroup(streamId, topicId, groupId);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}
};
16 changes: 16 additions & 0 deletions src/wire/consumer-group/leave-group.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

import type { CommandResponse } from '../../tcp.client.js';
import { type Id } from '../identifier.utils.js';
import { serializeTargetGroup } from './group.utils.js';

export const LEAVE_GROUP = {
code: 605,

serialize: (streamId: Id, topicId: Id, groupId: Id) => {
return serializeTargetGroup(streamId, topicId, groupId);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.length === 0;
}
};

0 comments on commit 237f0e7

Please sign in to comment.