Skip to content

Commit

Permalink
feat: add updateTopic command
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jan 25, 2024
1 parent 5d5d851 commit 761a9d0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
19 changes: 13 additions & 6 deletions src/tcp.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { GET_STREAM } from './wire/stream/get-stream.command.js';
import { GET_STREAMS } from './wire/stream/get-streams.command.js';
import { DELETE_STREAM } from './wire/stream/delete-stream.command.js';
import { CREATE_TOPIC } from './wire/topic/create-topic.command.js';
import { UPDATE_TOPIC } from './wire/topic/update-topic.command.js';
import { GET_TOPIC } from './wire/topic/get-topic.command.js';
import { GET_TOPICS } from './wire/topic/get-topics.command.js';
import { DELETE_TOPIC } from './wire/topic/delete-topic.command.js';
Expand Down Expand Up @@ -68,31 +69,37 @@ try {
// GET_TOPIC
const gtp = GET_TOPIC.serialize(streamId, 'test-topic-44');
const r_getTopic = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
console.log('RESPONSE_getTopic', GET_TOPIC.deserialize(r_getTopic));
const t2 = GET_TOPIC.deserialize(r_getTopic);
console.log('RESPONSE_getTopic', t2);

// UPDATE_TOPIC
const utp = UPDATE_TOPIC.serialize(
streamId, 44, 'test-topic-44', 42
);
const r_updateTopic = await sendCommandWithResponse(s)(UPDATE_TOPIC.code, utp);
console.log('RESPONSE_updateTopic', UPDATE_TOPIC.deserialize(r_updateTopic));

// CREATE_PARTITION
const cpa = CREATE_PARTITION.serialize(streamId, 'test-topic-44', 22);
const cpa = CREATE_PARTITION.serialize(streamId, t2.id, 22);
const r_createPartition = await sendCommandWithResponse(s)(CREATE_PARTITION.code, cpa);
console.log('RESPONSE_createPartition', CREATE_PARTITION.deserialize(r_createPartition));

// DELETE_PARTITION
const dpa = DELETE_PARTITION.serialize(streamId, 'test-topic-44', 12);
const dpa = DELETE_PARTITION.serialize(streamId, t2.id, 19);
const r_deletePartition = await sendCommandWithResponse(s)(DELETE_PARTITION.code, dpa);
console.log('RESPONSE_deletePartition', DELETE_PARTITION.deserialize(r_deletePartition));

// GET_TOPIC AGAIN
const r_getTopic2 = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
console.log('RESPONSE_getTopic2', GET_TOPIC.deserialize(r_getTopic2));


// GET_TOPICS
const gtps = GET_TOPICS.serialize(streamId);
const r_getTopics = await sendCommandWithResponse(s)(GET_TOPICS.code, gtps);
console.log('RESPONSE_getTopics', GET_TOPICS.deserialize(r_getTopics));


// DELETE TOPIC
const dtp = DELETE_TOPIC.serialize(streamId, 'test-topic-44', 3);
const dtp = DELETE_TOPIC.serialize(streamId, t2.id, 3);
const r_deleteTopic = await sendCommandWithResponse(s)(DELETE_TOPIC.code, dtp);
console.log('RESPONSE_deleteTopic', DELETE_TOPIC.deserialize(r_deleteTopic));

Expand Down
2 changes: 1 addition & 1 deletion src/wire/command.code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const COMMAND_CODE: CodeMap = {
GetTopics: '301',
CreateTopic: '302',
DeleteTopic: '303',
UpdateTopic: '304', // @TODO
UpdateTopic: '304',
CreatePartitions: '402',
DeletePartitions: '403',
GetGroup: '600',
Expand Down
11 changes: 7 additions & 4 deletions src/wire/topic/create-topic.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import { serializeIdentifier, type Id } from '../identifier.utils.js';
// messageExpiry: number
// };


export const CREATE_TOPIC = {
code: 302,

serialize: (
streamId: Id, topicId: number, name: string,
partitionCount: number, messageExpiry = 0,
maxTopicSize = 0, replicationFactor = 1
streamId: Id,
topicId: number,
name: string,
partitionCount: number,
messageExpiry = 0,
maxTopicSize = 0,
replicationFactor = 1
) => {
const streamIdentifier = serializeIdentifier(streamId);
const bName = Buffer.from(name)
Expand Down
49 changes: 49 additions & 0 deletions src/wire/topic/update-topic.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

import type { CommandResponse } from '../../tcp.client.js';
import { serializeIdentifier, type Id } from '../identifier.utils.js';

// export type CreateTopic = {
// streamId: number | string,
// topicId: number,
// name: string,
// partitionCount: number,
// messageExpiry: number
// };

export const UPDATE_TOPIC = {
code: 304,

serialize: (
streamId: Id,
topicId: Id,
name: string,
messageExpiry = 0,
maxTopicSize = 0,
replicationFactor = 1,

) => {
const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const bName = Buffer.from(name)

if (bName.length < 1 || bName.length > 255)
throw new Error('Topic name should be between 1 and 255 bytes');

const b = Buffer.allocUnsafe(4 + 8 + 1 + 1);
b.writeUInt32LE(messageExpiry, 0); // 0 is unlimited ???
b.writeBigUInt64LE(BigInt(maxTopicSize), 4); // optional, 0 is null
b.writeUInt8(replicationFactor, 12); // must be > 0
b.writeUInt8(bName.length, 13);

return Buffer.concat([
streamIdentifier,
topicIdentifier,
b,
bName,
]);
},

deserialize: (r: CommandResponse) => {
return r.status === 0 && r.data.length === 0;
}
};

0 comments on commit 761a9d0

Please sign in to comment.