Skip to content

Commit

Permalink
fix: fix consumer group commands
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Feb 2, 2024
1 parent 2e691d6 commit bb9ff9e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
6 changes: 5 additions & 1 deletion src/wire/consumer-group/create-group.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ export const CREATE_GROUP = {

serialize: ({streamId, topicId, groupId, name}:CreateGroup) => {
const bName = Buffer.from(name);

if (bName.length < 1 || bName.length > 255)
throw new Error('Consumer group name should be between 1 and 255 bytes');

const b = Buffer.allocUnsafe(5);
b.writeUInt32LE(groupId);
b.writeUInt8(bName.length);
b.writeUInt8(bName.length, 4);

return Buffer.concat([
serializeIdentifier(streamId),
Expand Down
3 changes: 1 addition & 2 deletions src/wire/consumer-group/get-group.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import type { CommandResponse } from '../../client/client.type.js';
import { type Id } from '../identifier.utils.js';
import { wrapCommand } from '../command.utils.js';
import {
serializeTargetGroup,
deserializeConsumerGroup,
serializeTargetGroup, deserializeConsumerGroup,
type ConsumerGroup
} from './group.utils.js';

Expand Down
11 changes: 10 additions & 1 deletion src/wire/consumer-group/get-groups.command.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@

import type { CommandResponse } from '../../client/client.type.js';
import type { ConsumerGroup } from './group.utils.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';
import { deserializeConsumerGroups } from './group.utils.js';
import { wrapCommand } from '../command.utils.js';

export type GetGroups = {
streamId: Id,
topicId: Id
};

export const GET_GROUPS = {
code: 601,

serialize: (streamId: Id, topicId: Id) => {
serialize: ({ streamId, topicId }: GetGroups) => {
return Buffer.concat([
serializeIdentifier(streamId),
serializeIdentifier(topicId),
Expand All @@ -17,3 +24,5 @@ export const GET_GROUPS = {
return deserializeConsumerGroups(r.data);
}
};

export const getGroups = wrapCommand<GetGroups, ConsumerGroup[]>(GET_GROUPS);
10 changes: 5 additions & 5 deletions src/wire/consumer-group/group.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ export type ConsumerGroup = {
name: string,
membersCount: number,
partitionsCount: number,
}
};

type ConsumerGroupDeserialized = {
bytesRead: number,
data: ConsumerGroup
}
};

export const serializeTargetGroup = (streamId: Id, topicId: Id, groupId: Id) => {
return Buffer.concat([
Expand All @@ -24,8 +24,8 @@ export const serializeTargetGroup = (streamId: Id, topicId: Id, groupId: Id) =>

export const deserializeConsumerGroup = (r: Buffer, pos = 0): ConsumerGroupDeserialized => {
const id = r.readUInt32LE(pos);
const membersCount = r.readUInt32LE(pos + 4);
const partitionsCount = r.readUInt32LE(pos + 8);
const partitionsCount = r.readUInt32LE(pos + 4);
const membersCount = r.readUInt32LE(pos + 8);
const nameLength = r.readUInt8(pos + 12);
const name = r.subarray(pos + 13, pos + 13 + nameLength).toString();

Expand All @@ -34,8 +34,8 @@ export const deserializeConsumerGroup = (r: Buffer, pos = 0): ConsumerGroupDeser
data: {
id,
name,
membersCount,
partitionsCount,
membersCount,
}
}
};
Expand Down

0 comments on commit bb9ff9e

Please sign in to comment.