diff --git a/package.json b/package.json index 97c73f4..8ae101e 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "description": "nodejs iggy.rs binary client", "main": "index.js", "scripts": { - "test": "node --test dist/", + "test": "node --test --experimental-test-coverage dist/**/**/*.test.js", "clean": "rm -Rf dist/", "build": "tsc -p tsconfig.json", "start": "node dist/index.js" diff --git a/src/client/client.type.ts b/src/client/client.type.ts new file mode 100644 index 0000000..d5e042d --- /dev/null +++ b/src/client/client.type.ts @@ -0,0 +1,10 @@ + +export type CommandResponse = { + status: number, + length: number, + data: Buffer +}; + +export type Client = { + sendCommand: (code: number, payload: Buffer) => Promise +} diff --git a/src/client/client.utils.ts b/src/client/client.utils.ts new file mode 100644 index 0000000..0074c43 --- /dev/null +++ b/src/client/client.utils.ts @@ -0,0 +1,52 @@ + +import type { CommandResponse } from './client.type.js'; +import { translateCommandCode } from '../wire/command.code.js'; + + +export const handleResponse = (r: Buffer) => { + const status = r.readUint32LE(0); + const length = r.readUint32LE(4); + console.log('<== handleResponse', { status, length }); + return { + status, length, data: r.subarray(8) + }}; + +export const deserializeVoidResponse = + (r: CommandResponse): Boolean => r.status === 0 && r.data.length === 0; + +const COMMAND_LENGTH = 4; + +export const serializeCommand = (command: number, payload: Buffer) => { + const payloadSize = payload.length + COMMAND_LENGTH; + const head = Buffer.allocUnsafe(8); + + head.writeUint32LE(payloadSize, 0); + head.writeUint32LE(command, 4); + + console.log( + '==> CMD', command, + translateCommandCode(command), + head.subarray(4, 8).toString('hex'), + 'LENGTH', payloadSize, + head.subarray(0, 4).toString('hex') + ); + + return Buffer.concat([head, payload]); +} + + +// enum TransportType { +// TCP = 'tcp', +// // TLS = 'tls', +// // QUIC = 'quic' +// } + +// type ClientConfig = { +// transport: TransportType +// host: string, +// port: number +// } + +// export const transportClient = (config: ClientConfig): Client => { +// const {transport} = config; +// }; diff --git a/src/tcp.client.ts b/src/client/tcp.client.ts similarity index 69% rename from src/tcp.client.ts rename to src/client/tcp.client.ts index 6f39d8a..ac896a1 100644 --- a/src/tcp.client.ts +++ b/src/client/tcp.client.ts @@ -1,20 +1,9 @@ import { createConnection, Socket } from 'node:net'; -import { responseError } from './wire/error.utils.js'; -import { translateCommandCode } from './wire/command.code.js'; +import { serializeCommand } from './client.utils.js'; +import type { CommandResponse, Client } from './client.type.js'; +import { responseError } from '../wire/error.utils.js'; -// interface IggyClient { -// socket: Socket -// } - -const COMMAND_LENGTH = 4; -// const REQUEST_INITIAL_BYTES_LENGTH = 4; - -export type CommandResponse = { - status: number, - length: number, - data: Buffer -}; export const createClient = ( host: string, port: number, keepAlive = true @@ -36,25 +25,10 @@ export const createClient = ( }); }; - export const sendCommandWithResponse = (s: Socket) => (command: number, payload: Buffer): Promise => { - const payloadSize = payload.length + COMMAND_LENGTH; - const head = Buffer.alloc(8); - - head.writeUint32LE(payloadSize, 0); - head.writeUint32LE(command, 4); - - console.log( - '==> CMD', command, - translateCommandCode(command), - head.subarray(4, 8).toString('hex'), - 'LENGTH', payloadSize, - head.subarray(0, 4).toString('hex') - ); - - const cmd = Buffer.concat([head, payload]); + const cmd = serializeCommand(command, payload); console.log( '==> sending cmd', command, cmd.toString('hex') ); @@ -87,3 +61,21 @@ export const handleResponse = (r: Buffer) => { status, length, data: r.subarray(8) } }; + +export type TcpOption = { + host: string, + port: number, + keepAlive?: boolean +}; + + +export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): Client => { + let socket: Socket; + return { + sendCommand: async (code: number, payload: Buffer):Promise => { + if (!socket) + socket = await createClient(host, port, keepAlive); + return sendCommandWithResponse(socket)(code, payload); + } + } +}; diff --git a/src/tcp.client.e2e.ts b/src/tcp.client.e2e.ts index d5c63f7..0f87267 100644 --- a/src/tcp.client.e2e.ts +++ b/src/tcp.client.e2e.ts @@ -1,41 +1,34 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; - -import { LOGIN } from './wire/session/login.command.js'; -import { GET_ME } from './wire/client/get-me.command.js'; -import { GET_CLIENTS } from './wire/client/get-clients.command.js'; -import { GET_CLIENT } from './wire/client/get-client.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { getMe } from './wire/client/get-me.command.js'; +import { getClients } from './wire/client/get-clients.command.js'; +import { getClient } from './wire/client/get-client.command.js'; +import { logout } from './wire/session/logout.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); // GET_ME - const r_getMe = await sendCommandWithResponse(s)(GET_ME.code, GET_ME.serialize()); - console.log('RESPONSE_getMe', r_getMe.toString(), GET_ME.deserialize(r_getMe)); + const r_getMe = await getMe(s)(); + console.log('RESPONSE_getMe', r_getMe); // GET_CLIENTS - const r4 = await sendCommandWithResponse(s)(GET_CLIENTS.code, GET_CLIENTS.serialize()); - const ls = GET_CLIENTS.deserialize(r4) // used after; - console.log('RESPONSE4', r4.toString(), ls); + const ls = await getClients(s)(); + console.log('RESPONSE4', ls); // GET_CLIENT #ID - const rCli = await sendCommandWithResponse(s)( - GET_CLIENT.code, GET_CLIENT.serialize(ls[0].clientId) - ); - console.log('RESPONSECli', rCli.toString(), GET_CLIENTS.deserialize(rCli)); + const rCli = await getClient(s)({ clientId: ls[0].clientId }); + console.log('RESPONSECli', rCli); // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { diff --git a/src/tcp.e2e.ts b/src/tcp.e2e.ts index 38be9c2..3bdc7e4 100644 --- a/src/tcp.e2e.ts +++ b/src/tcp.e2e.ts @@ -1,116 +1,115 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; - -import { LOGIN } from './wire/session/login.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; -import { CREATE_STREAM } from './wire/stream/create-stream.command.js'; -import { UPDATE_STREAM } from './wire/stream/update-stream.command.js'; -import { GET_STREAM } from './wire/stream/get-stream.command.js'; -import { GET_STREAMS } from './wire/stream/get-streams.command.js'; -import { DELETE_STREAM } from './wire/stream/delete-stream.command.js'; -import { CREATE_TOPIC } from './wire/topic/create-topic.command.js'; -import { UPDATE_TOPIC } from './wire/topic/update-topic.command.js'; -import { GET_TOPIC } from './wire/topic/get-topic.command.js'; -import { GET_TOPICS } from './wire/topic/get-topics.command.js'; -import { DELETE_TOPIC } from './wire/topic/delete-topic.command.js'; -import { CREATE_PARTITION } from './wire/partition/create-partition.command.js'; -import { DELETE_PARTITION } from './wire/partition/delete-partition.command.js'; +// import { createClient, sendCommandWithResponse } from './tcp.client.js'; + +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; +import { createStream } from './wire/stream/create-stream.command.js'; +import { updateStream } from './wire/stream/update-stream.command.js'; +import { getStream } from './wire/stream/get-stream.command.js'; +import { getStreams } from './wire/stream/get-streams.command.js'; +import { deleteStream } from './wire/stream/delete-stream.command.js'; +import { createTopic } from './wire/topic/create-topic.command.js'; +import { updateTopic } from './wire/topic/update-topic.command.js'; +import { getTopic } from './wire/topic/get-topic.command.js'; +import { getTopics } from './wire/topic/get-topics.command.js'; +import { deleteTopic } from './wire/topic/delete-topic.command.js'; +import { createPartition } from './wire/partition/create-partition.command.js'; +import { deletePartition } from './wire/partition/delete-partition.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); - const streamName = 'test-stream'; - const streamId = 1; + const stream = { + name: 'test-stream', + streamId: 1 + }; // CREATE_STREAM - const createStreamCmd = CREATE_STREAM.serialize(streamId, streamName); - const r_createStream = await sendCommandWithResponse(s)( - CREATE_STREAM.code, createStreamCmd - ); - console.log('RESPONSE_createStream', CREATE_STREAM.deserialize(r_createStream)); + const r_createStream = await createStream(s)(stream); + console.log('RESPONSE_createStream', r_createStream); // GET_STREAM #ID - const getStreamCmd = GET_STREAM.serialize(streamId); - const r7 = await sendCommandWithResponse(s)(GET_STREAM.code, getStreamCmd); - console.log('RESPONSE7', GET_STREAM.deserialize(r7)); + const r7 = await getStream(s)({ streamId: stream.streamId }); + console.log('RESPONSE7', r7); // GET_STREAM #NAME - const getStreamCmd2 = GET_STREAM.serialize(streamName); - const r8 = await sendCommandWithResponse(s)(GET_STREAM.code, getStreamCmd2); - console.log('RESPONSE8', GET_STREAM.deserialize(r8)); + const r8 = await getStream(s)({ streamId: stream.name }); + console.log('RESPONSE8', r8); // UPDATE_STREAM - const updateStreamCmd = UPDATE_STREAM.serialize(streamId, 'updatedStreamName'); - const r_updateStream = await sendCommandWithResponse(s)( - UPDATE_STREAM.code, updateStreamCmd - ); - console.log('RESPONSE_updateStream', UPDATE_STREAM.deserialize(r_updateStream)); + const r_updateStream = await updateStream(s)({ + streamId: stream.streamId, name: 'updatedStreamName' + }); + console.log('RESPONSE_updateStream', r_updateStream); // GET_STREAMS - const r9 = await sendCommandWithResponse(s)(GET_STREAMS.code, GET_STREAMS.serialize()); - console.log('RESPONSE9', GET_STREAMS.deserialize(r9)); + const r9 = await getStreams(s)(); + console.log('RESPONSE9', r9); + + const topic1 = { + streamId: stream.streamId, + topicId: 44, + name: 'topic-name-44', + partitionCount: 3, + messageExpiry: 0, + maxTopicSize: 0, + replicationFactor: 1 + }; // CREATE_TOPIC - const ctp = CREATE_TOPIC.serialize( - streamId, 44, 'test-topic-44', 3, 0, 0, 1 - ); - const r_createTopic = await sendCommandWithResponse(s)(CREATE_TOPIC.code, ctp); - console.log('RESPONSE_createTopic', CREATE_TOPIC.deserialize(r_createTopic)); + const r_createTopic = await createTopic(s)(topic1); + console.log('RESPONSE_createTopic', r_createTopic); // GET_TOPIC - const gtp = GET_TOPIC.serialize(streamId, 'test-topic-44'); - const r_getTopic = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp); - const t2 = GET_TOPIC.deserialize(r_getTopic); + const t2 = await getTopic(s)({ streamId: topic1.streamId, topicId: topic1.name }); console.log('RESPONSE_getTopic', t2); // UPDATE_TOPIC - const utp = UPDATE_TOPIC.serialize( - streamId, 44, 'test-topic-44', 42 - ); - const r_updateTopic = await sendCommandWithResponse(s)(UPDATE_TOPIC.code, utp); - console.log('RESPONSE_updateTopic', UPDATE_TOPIC.deserialize(r_updateTopic)); + const r_updateTopic = await updateTopic(s)({ + streamId: topic1.streamId, topicId: topic1.topicId, name: topic1.name, messageExpiry: 42 + }); + console.log('RESPONSE_updateTopic', r_updateTopic); // CREATE_PARTITION - const cpa = CREATE_PARTITION.serialize(streamId, t2.id, 22); - const r_createPartition = await sendCommandWithResponse(s)(CREATE_PARTITION.code, cpa); - console.log('RESPONSE_createPartition', CREATE_PARTITION.deserialize(r_createPartition)); + const r_createPartition = await createPartition(s)({ + streamId: topic1.streamId, topicId: t2.id, partitionCount: 22 + }); + console.log('RESPONSE_createPartition', r_createPartition); // DELETE_PARTITION - const dpa = DELETE_PARTITION.serialize(streamId, t2.id, 19); - const r_deletePartition = await sendCommandWithResponse(s)(DELETE_PARTITION.code, dpa); - console.log('RESPONSE_deletePartition', DELETE_PARTITION.deserialize(r_deletePartition)); + const r_deletePartition = await deletePartition(s)({ + streamId: topic1.streamId, topicId: t2.id, partitionCount: 19 + }); + console.log('RESPONSE_deletePartition', r_deletePartition); - // GET_TOPIC AGAIN - const r_getTopic2 = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp); - console.log('RESPONSE_getTopic2', GET_TOPIC.deserialize(r_getTopic2)); + // GET_TOPIC AGAIN + const r_getTopic2 = await getTopic(s)({ streamId: topic1.streamId, topicId: topic1.name }); + console.log('RESPONSE_getTopic2', r_getTopic2); - // GET_TOPICS - const gtps = GET_TOPICS.serialize(streamId); - const r_getTopics = await sendCommandWithResponse(s)(GET_TOPICS.code, gtps); - console.log('RESPONSE_getTopics', GET_TOPICS.deserialize(r_getTopics)); + // GET_TOPICS + const r_getTopics = await getTopics(s)({ streamId: topic1.streamId }); + console.log('RESPONSE_getTopics', r_getTopics); // DELETE TOPIC - const dtp = DELETE_TOPIC.serialize(streamId, t2.id, 3); - const r_deleteTopic = await sendCommandWithResponse(s)(DELETE_TOPIC.code, dtp); - console.log('RESPONSE_deleteTopic', DELETE_TOPIC.deserialize(r_deleteTopic)); + const r_deleteTopic = await deleteTopic(s)({ + streamId: topic1.streamId, topicId: t2.id, partitionsCount: 3 + }); + console.log('RESPONSE_deleteTopic', r_deleteTopic); // DELETE STREAM - const dst = DELETE_STREAM.serialize(streamId); - const rDelS = await sendCommandWithResponse(s)(DELETE_STREAM.code, dst); - console.log('RESPONSEDelS', DELETE_STREAM.deserialize(rDelS)); + const rDelS = await deleteStream(s)({ streamId: stream.streamId }); + console.log('RESPONSEDelS', rDelS); // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { diff --git a/src/tcp.send-message.e2e.ts b/src/tcp.send-message.e2e.ts index 9eff66f..8630882 100644 --- a/src/tcp.send-message.e2e.ts +++ b/src/tcp.send-message.e2e.ts @@ -1,92 +1,114 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; import { v7 } from './wire/uuid.utils.js'; - -import { LOGIN } from './wire/session/login.command.js'; -import { SEND_MESSAGES } from './wire/message/send-messages.command.js'; -import { POLL_MESSAGES } from './wire/message/poll-messages.command.js'; -import { CREATE_TOPIC } from './wire/topic/create-topic.command.js'; -import { PURGE_TOPIC } from './wire/topic/purge-topic.command.js'; -import { CREATE_STREAM } from './wire/stream/create-stream.command.js'; -import { PURGE_STREAM } from './wire/stream/purge-stream.command.js'; -import { GET_OFFSET } from './wire/offset/get-offset.command.js'; -import { STORE_OFFSET } from './wire/offset/store-offset.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; +import { TcpClient } from './client/tcp.client.js'; + +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; +import { sendMessages } from './wire/message/send-messages.command.js'; +import { pollMessages } from './wire/message/poll-messages.command.js'; +import { createTopic } from './wire/topic/create-topic.command.js'; +import { deleteTopic } from './wire/topic/delete-topic.command.js'; +import { purgeTopic } from './wire/topic/purge-topic.command.js'; +import { createStream } from './wire/stream/create-stream.command.js'; +import { deleteStream } from './wire/stream/delete-stream.command.js'; +import { purgeStream } from './wire/stream/purge-stream.command.js'; +import { getOffset } from './wire/offset/get-offset.command.js'; +import { storeOffset } from './wire/offset/store-offset.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); - + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); const streamId = 101; const topicId = 'test-topic-sm'; const partitionId = 1; - // // CREATE_STREAM - // const createStreamCmd = CREATE_STREAM.serialize(streamId, 'test-send-message'); - // const r_createStream = await sendCommandWithResponse(s)( - // CREATE_STREAM.code, createStreamCmd - // ); - // console.log('RESPONSE_createStream', CREATE_STREAM.deserialize(r_createStream)); - - // // CREATE_TOPIC - // const ctp = CREATE_TOPIC.serialize( - // streamId, 1, topicId, 3, 0, 0, 1 - // ); - // const r_createTopic = await sendCommandWithResponse(s)(CREATE_TOPIC.code, ctp); - // console.log('RESPONSE_createTopic', CREATE_TOPIC.deserialize(r_createTopic)); + const stream = { + name: 'test-stream', + streamId + }; + + // CREATE_STREAM + const r_createStream = await createStream(s)(stream); + console.log('RESPONSE_createStream', r_createStream); + + const topic1 = { + streamId, + topicId: 44, + name: topicId, + partitionCount: 3, + messageExpiry: 0, + maxTopicSize: 0, + replicationFactor: 1 + }; + + // CREATE_TOPIC + const r_createTopic = await createTopic(s)(topic1); + console.log('RESPONSE_createTopic', r_createTopic); + + const msg = { + streamId, + topicId, + messages: [{ id: v7(), payload: 'yolo msg' }] + }; // SEND MESSAGES - const cmdSm = SEND_MESSAGES.serialize( - streamId, topicId, - [{ id: v7(), payload: 'yolo msg' }] - ); - const rSend = await sendCommandWithResponse(s)(SEND_MESSAGES.code, cmdSm); - console.log('RESPONSE SEND_MESSAGE', SEND_MESSAGES.deserialize(rSend)); + const rSend = await sendMessages(s)(msg); + console.log('RESPONSE SEND_MESSAGE', rSend); // POLL MESSAGE const pollStrat = { kind: 5, value: 0n }; - const cmdPol = POLL_MESSAGES.serialize( - streamId, topicId, { kind: 1, id: 1 }, 1, pollStrat, 2, false - ); - const rPoll = await sendCommandWithResponse(s)(POLL_MESSAGES.code, cmdPol); - console.log('RESPONSE POLL_MESSAGE', POLL_MESSAGES.deserialize(rPoll)); - - // // GET OFFSET - // const gof = GET_OFFSET.serialize(streamId, topicId, { kind: 1, id: 1 }, partitionId); - // const rOff = await sendCommandWithResponse(s)(GET_OFFSET.code, gof); - // console.log('RESPONSE GET_OFFSET', GET_OFFSET.deserialize(rOff)); + const pollReq = { + streamId, + topicId, + consumer: { kind: 1, id: 1 }, + partitionId, + pollingStrategy: pollStrat, + count: 2, + autocommit: false + }; + + const rPoll = await pollMessages(s)(pollReq); + console.log('RESPONSE POLL_MESSAGE', rPoll); + + // GET OFFSET + const rOff = await getOffset(s)({ + streamId, topicId, consumer: { kind: 1, id: 1 }, partitionId + }); + console.log('RESPONSE GET_OFFSET', rOff); // // STORE OFFSET - // const sof = STORE_OFFSET.serialize( - // streamId, topicId, { kind: 1, id: 1 }, partitionId, 1n - // ); - // const rsOff = await sendCommandWithResponse(s)(STORE_OFFSET.code, sof); - // console.log('RESPONSE STORE_OFFSET', STORE_OFFSET.deserialize(rsOff)); + // const rsOff = await storeOffset(s)({ + // streamId, topicId, consumer: { kind: 1, id: 1 }, partitionId, offset: 1n + // }); + // console.log('RESPONSE STORE_OFFSET', rsOff); // PURGE TOPIC - const ptp = PURGE_TOPIC.serialize(streamId, topicId); - const r_purgeTopic = await sendCommandWithResponse(s)(PURGE_TOPIC.code, ptp); - console.log('RESPONSE_purgeTopic', PURGE_TOPIC.deserialize(r_purgeTopic)); + const r_purgeTopic = await purgeTopic(s)({ streamId, topicId }); + console.log('RESPONSE_purgeTopic', r_purgeTopic); // PURGE STREAM - const pst = PURGE_STREAM.serialize(streamId); - const r_purgeStream = await sendCommandWithResponse(s)(PURGE_STREAM.code, pst); - console.log('RESPONSE_purgeStream', PURGE_STREAM.deserialize(r_purgeStream)); + const r_purgeStream = await purgeStream(s)({ streamId }); + console.log('RESPONSE_purgeStream', r_purgeStream); + // DELETE TOPIC + const r_deleteTopic = await deleteTopic(s)({ + streamId, topicId, partitionsCount: topic1.partitionCount + }); + console.log('RESPONSE_deleteTopic', r_deleteTopic); - // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); + // DELETE STREAM + const rDelS = await deleteStream(s)({ streamId: stream.streamId }); + console.log('RESPONSEDelS', rDelS); + // LOGOUT + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { console.error('FAILED!', err); diff --git a/src/tcp.system.e2e.ts b/src/tcp.system.e2e.ts index 9eb5c91..ac42f86 100644 --- a/src/tcp.system.e2e.ts +++ b/src/tcp.system.e2e.ts @@ -1,35 +1,29 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; - -import { PING } from './wire/system/ping.command.js'; -import { LOGIN } from './wire/session/login.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; -import { GET_STATS } from './wire/system/get-stats.command.js'; +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; +import { getStats } from './wire/system/get-stats.command.js'; +import { ping } from './wire/system/ping.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // PING - const r2 = await sendCommandWithResponse(s)(PING.code, PING.serialize()); - console.log('RESPONSE PING', r2, PING.deserialize(r2)); + const r2 = await ping(s)() + console.log('RESPONSE PING', r2); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); // GET_STATS - const r_stats = await sendCommandWithResponse(s)(GET_STATS.code, GET_STATS.serialize()); - console.log('RESPONSE_stats', r_stats.toString(), GET_STATS.deserialize(r_stats)); - + const r_stats = await getStats(s)(); + console.log('RESPONSE_stats', r_stats); // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); - + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { console.error('FAILED!', err); diff --git a/src/tcp.token.e2e.ts b/src/tcp.token.e2e.ts index 6bbf8b2..aab3f0f 100644 --- a/src/tcp.token.e2e.ts +++ b/src/tcp.token.e2e.ts @@ -1,41 +1,36 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; - -import { LOGIN } from './wire/session/login.command.js'; +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; // import { LOGIN_WITH_TOKEN } from './wire/session/login-with-token.command.js'; -import { GET_TOKENS } from './wire/token/get-tokens.command.js'; -import { CREATE_TOKEN } from './wire/token/create-token.command.js'; -import { DELETE_TOKEN } from './wire/token/delete-token.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; +import { getTokens } from './wire/token/get-tokens.command.js'; +import { createToken } from './wire/token/create-token.command.js'; +import { deleteToken } from './wire/token/delete-token.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); // CREATE_TOKEN - const ptk = CREATE_TOKEN.serialize('yolo-token-test', 1800); - const r_createToken = await sendCommandWithResponse(s)(CREATE_TOKEN.code, ptk); - console.log('RESPONSE_createToken', CREATE_TOKEN.deserialize(r_createToken)); + const r_createToken = await createToken(s)({ name: 'yolo-token-test', expiry: 1800 }); + console.log('RESPONSE_createToken', r_createToken); // GET_TOKENS - const r14 = await sendCommandWithResponse(s)(GET_TOKENS.code, GET_TOKENS.serialize()); - console.log('RESPONSE14', GET_TOKENS.deserialize(r14)); + const r14 = await getTokens(s)(); + // const r14 = await sendCommandWithResponse(s)(GET_TOKENS.code, GET_TOKENS.serialize()); + console.log('RESPONSE14', r14); // DELETE TOKEN - const dtk = DELETE_TOKEN.serialize('yolo-token-test'); - const r_deleteToken = await sendCommandWithResponse(s)(DELETE_TOKEN.code, dtk); - console.log('RESPONSE_deleteToken', DELETE_TOKEN.deserialize(r_deleteToken)); + const r_deleteToken = await deleteToken(s)({ name: 'yolo-token-test' }); + console.log('RESPONSE_deleteToken', r_deleteToken); // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { diff --git a/src/tcp.topic.e2e.ts b/src/tcp.topic.e2e.ts new file mode 100644 index 0000000..0965d2a --- /dev/null +++ b/src/tcp.topic.e2e.ts @@ -0,0 +1,44 @@ + +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; +import { createStream } from './wire/stream/create-stream.command.js'; +import { createTopic } from './wire/topic/create-topic.command.js'; +import { deleteStream } from './wire/stream/delete-stream.command.js'; + + +try { + + const s = TcpClient({ host: '127.0.0.1', port: 8090, }); + const streamId = 1; + const topicId = 3; + + // LOGIN + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); + + // CREATE_STREAM + const r_createStream = await createStream(s)({ streamId, name: 'test-topic-12' }); + console.log('RESPONSE_createStream', r_createStream); + + await createTopic(s)({ + streamId, + topicId, + name: 'test-topic-fuu', + partitionCount: 0, + messageExpiry: 0, + maxTopicSize: 0, + replicationFactor: 1 + }); + + // DELETE STREAM + const rDelS = await deleteStream(s)({ streamId }); + console.log('RESPONSEDelS', rDelS); + + // LOGOUT + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); + +} catch (err) { + console.error('FAILED!', err); +} diff --git a/src/tcp.user.e2e.ts b/src/tcp.user.e2e.ts index f93d0a2..2abeb60 100644 --- a/src/tcp.user.e2e.ts +++ b/src/tcp.user.e2e.ts @@ -1,30 +1,22 @@ -import { createClient, sendCommandWithResponse } from './tcp.client.js'; - -import { LOGIN } from './wire/session/login.command.js'; -import { GET_USER } from './wire/user/get-user.command.js'; -import { CREATE_USER } from './wire/user/create-user.command.js'; -import { CHANGE_PASSWORD } from './wire/user/change-password.command.js'; -import { UPDATE_USER } from './wire/user/update-user.command.js'; -import { UPDATE_PERMISSIONS } from './wire/user/update-permissions.command.js'; -import { DELETE_USER } from './wire/user/delete-user.command.js'; -import { GET_USERS } from './wire/user/get-users.command.js'; -import { LOGOUT } from './wire/session/logout.command.js'; - -import { - serializePermissions, deserializePermissions -} from './wire/user/permissions.utils.js'; +import { TcpClient } from './client/tcp.client.js'; +import { login } from './wire/session/login.command.js'; +import { logout } from './wire/session/logout.command.js'; +import { getUser } from './wire/user/get-user.command.js'; +import { createUser } from './wire/user/create-user.command.js'; +import { changePassword } from './wire/user/change-password.command.js'; +import { updateUser } from './wire/user/update-user.command.js'; +import { updatePermissions } from './wire/user/update-permissions.command.js'; +import { deleteUser } from './wire/user/delete-user.command.js'; +import { getUsers } from './wire/user/get-users.command.js'; try { // create socket - const s = await createClient('127.0.0.1', 8090); - console.log('CLI', s.readyState); + const s = TcpClient({ host: '127.0.0.1', port: 8090 }); // LOGIN - const loginCmd = LOGIN.serialize('iggy', 'iggy'); - console.log('LOGIN', loginCmd.toString()); - const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd); - console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r)); + const r = await login(s)({ username: 'iggy', password: 'iggy' }); + console.log('RESPONSE_login', r); const username = 'test-user'; const password = 'test-pwd123$!'; @@ -45,64 +37,55 @@ try { streams: [] }; - // const dsp = serializePermissions(permissions); - // const rsp = deserializePermissions(dsp); - // console.log(JSON.stringify(rsp, null, 2)); + const cUser = { username, password, status, permissions }; // CREATE_USER - const createUserCmd = CREATE_USER.serialize(username, password, status, permissions); - const rCreateUser = await sendCommandWithResponse(s)(CREATE_USER.code, createUserCmd); - console.log('RESPONSE_createUser', CREATE_USER.deserialize(rCreateUser)); + const rCreateUser = await createUser(s)(cUser); + console.log('RESPONSE_createUser', rCreateUser); // GET_USERS - const r12 = await sendCommandWithResponse(s)(GET_USERS.code, GET_USERS.serialize()); - console.log('RESPONSE12', GET_USERS.deserialize(r12)); + const r12 = await getUsers(s)(); + console.log('RESPONSE getUsers', r12); // GET_USER #NAME - const guCmd = GET_USER.serialize(username); - const rGUsr = await sendCommandWithResponse(s)(GET_USER.code, guCmd); - const uGUsr = GET_USER.deserialize(rGUsr); + const uGUsr = await getUser(s)({ userId: username }); console.log('RESPONSE GetUser/name', uGUsr); // GET_USER #ID - const r11 = await sendCommandWithResponse(s)(GET_USER.code, GET_USER.serialize(uGUsr.id)); - console.log('RESPONSE GetUser/id', GET_USER.deserialize(r11)); + const r11 = await getUser(s)({ userId: uGUsr.id }); + console.log('RESPONSE GetUser/id', r11); // UPDATE_USER - const updateUserCmd = UPDATE_USER.serialize(uGUsr.id, 'usernameUpdated', 2); - const rUpdateUser = await sendCommandWithResponse(s)(UPDATE_USER.code, updateUserCmd); - console.log('RESPONSE_updateUser', UPDATE_USER.deserialize(rUpdateUser)); + const rUpdateUser = await updateUser(s)({ + userId: uGUsr.id, username: 'usernameUpdated', status: 2 + }); + console.log('RESPONSE_updateUser', rUpdateUser); // CHANGE_PASSWORD - const changePasswordCmd = CHANGE_PASSWORD.serialize(uGUsr.id, password, 'h4x0r42'); - const rChangePassword = await sendCommandWithResponse(s)( - CHANGE_PASSWORD.code, changePasswordCmd - ); - console.log('RESPONSE_changePassword', CHANGE_PASSWORD.deserialize(rChangePassword)); + const rChangePassword = await changePassword(s)({ + userId: uGUsr.id, currentPassword: password, newPassword: 'h4x0r42' + }); + console.log('RESPONSE_changePassword', rChangePassword); // UPDATE_PERMISSIONS const perms2 = { ...permissions }; perms2.global.ManageServers = true; - const updatePermissionsCmd = UPDATE_PERMISSIONS.serialize(uGUsr.id, perms2); - const rUpdatePermissions = await sendCommandWithResponse(s)( - UPDATE_PERMISSIONS.code, updatePermissionsCmd - ); - console.log('RESPONSE_updatePerms', UPDATE_PERMISSIONS.deserialize(rUpdatePermissions)); + const rUpdatePermissions = await updatePermissions(s)({ + userId: uGUsr.id, permissions: perms2 + }); + console.log('RESPONSE_updatePerms', rUpdatePermissions); // GET_USER #ID 2 - const rgu = await sendCommandWithResponse(s)(GET_USER.code, GET_USER.serialize(uGUsr.id)); - console.log('RESPONSE GetUser/id', GET_USER.deserialize(rgu)); + const rgu = await getUser(s)({ userId: uGUsr.id }); + console.log('RESPONSE GetUser/id', rgu); // DELETE_USER #ID - const r13 = await sendCommandWithResponse(s)( - DELETE_USER.code, DELETE_USER.serialize(uGUsr.id) - ); - console.log('RESPONSE deleteUser/id', DELETE_USER.deserialize(r13)); + const r13 = await deleteUser(s)({ userId: uGUsr.id }) + console.log('RESPONSE deleteUser/id', r13); // LOGOUT - const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize()); - console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut)); - + const rOut = await logout(s)(); + console.log('RESPONSE LOGOUT', rOut); } catch (err) { console.error('FAILED!', err); diff --git a/src/wire/client/get-client.command.ts b/src/wire/client/get-client.command.ts index 07caa0d..4b0a1c1 100644 --- a/src/wire/client/get-client.command.ts +++ b/src/wire/client/get-client.command.ts @@ -1,14 +1,21 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeClient } from './client.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { deserializeClient, type Client } from './client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type GetClient = { + clientId: number +}; // GET CLIENT by id export const GET_CLIENT = { code: 21, - serialize: (id: number): Buffer => { + serialize: ({ clientId }: GetClient): Buffer => { const b = Buffer.alloc(4); - b.writeUInt32LE(id); + b.writeUInt32LE(clientId); return b; }, deserialize: (r: CommandResponse) => deserializeClient(r.data).data }; + +export const getClient = wrapCommand(GET_CLIENT); diff --git a/src/wire/client/get-clients.command.ts b/src/wire/client/get-clients.command.ts index bc2c7b0..666d0df 100644 --- a/src/wire/client/get-clients.command.ts +++ b/src/wire/client/get-clients.command.ts @@ -1,6 +1,7 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeClient } from './client.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { deserializeClient, type Client } from './client.utils.js'; +import { wrapCommand } from '../command.utils.js'; export const GET_CLIENTS = { code: 22, @@ -16,4 +17,7 @@ export const GET_CLIENTS = { } return clients; } -}; \ No newline at end of file +}; + + +export const getClients = wrapCommand(GET_CLIENTS); diff --git a/src/wire/client/get-me.command.ts b/src/wire/client/get-me.command.ts index a86c2de..c4356a2 100644 --- a/src/wire/client/get-me.command.ts +++ b/src/wire/client/get-me.command.ts @@ -1,11 +1,16 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeClient } from './client.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { deserializeClient, type Client } from './client.utils.js'; +import { wrapCommand } from '../command.utils.js'; // GET ME export const GET_ME = { code: 20, + serialize: () => Buffer.alloc(0), + deserialize: (r: CommandResponse) => deserializeClient(r.data).data }; + +export const getMe = wrapCommand(GET_ME); diff --git a/src/wire/command.utils.ts b/src/wire/command.utils.ts new file mode 100644 index 0000000..4881ef6 --- /dev/null +++ b/src/wire/command.utils.ts @@ -0,0 +1,17 @@ + +import type { CommandResponse, Client } from '../client/client.type.js'; + +export type ArgTypes = F extends (...args: infer A) => any ? A : never; + +export type Command = { + code: number, + serialize: (args: I) => Buffer, + deserialize: (r: CommandResponse) => O +} + +export function wrapCommand(cmd: Command) { + return (client: Client) => + async (arg: I) => cmd.deserialize( + await client.sendCommand(cmd.code, cmd.serialize(arg)) + ); +}; diff --git a/src/wire/consumer-group/create-group.command.ts b/src/wire/consumer-group/create-group.command.ts index 8158f81..8c92538 100644 --- a/src/wire/consumer-group/create-group.command.ts +++ b/src/wire/consumer-group/create-group.command.ts @@ -1,16 +1,19 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type CreateGroup = { + streamId: Id, + topicId: Id, + groupId: number, + name: string, +}; export const CREATE_GROUP = { code: 602, - serialize: ( - streamId: Id, - topicId: Id, - groupId: number, - name: string, - ) => { + serialize: ({streamId, topicId, groupId, name}:CreateGroup) => { const bName = Buffer.from(name); const b = Buffer.allocUnsafe(5); b.writeUInt32LE(groupId); @@ -24,7 +27,7 @@ export const CREATE_GROUP = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const createGroup = wrapCommand(CREATE_GROUP); diff --git a/src/wire/consumer-group/delete-group.command.ts b/src/wire/consumer-group/delete-group.command.ts index 7fe13e1..8dc13c2 100644 --- a/src/wire/consumer-group/delete-group.command.ts +++ b/src/wire/consumer-group/delete-group.command.ts @@ -1,16 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { type Id } from '../identifier.utils.js'; import { serializeTargetGroup } from './group.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type DeleteGroup = { + streamId: Id, + topicId: Id, + groupId: Id +}; export const DELETE_GROUP = { code: 603, - serialize: (streamId: Id, topicId: Id, groupId: Id) => { + serialize: ({streamId, topicId, groupId}: DeleteGroup) => { return serializeTargetGroup(streamId, topicId, groupId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const deleteGroup = wrapCommand(DELETE_GROUP); diff --git a/src/wire/consumer-group/get-group.command.ts b/src/wire/consumer-group/get-group.command.ts index c437b8e..ec26b50 100644 --- a/src/wire/consumer-group/get-group.command.ts +++ b/src/wire/consumer-group/get-group.command.ts @@ -1,16 +1,29 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; import { type Id } from '../identifier.utils.js'; -import { serializeTargetGroup, deserializeConsumerGroup } from './group.utils.js'; +import { wrapCommand } from '../command.utils.js'; +import { + serializeTargetGroup, + deserializeConsumerGroup, + type ConsumerGroup +} from './group.utils.js'; + +export type GetGroup = { + streamId: Id, + topicId: Id, + groupId: Id +}; export const GET_GROUP = { code: 600, - serialize: (streamId: Id, topicId: Id, groupId: Id) => { + serialize: ({streamId, topicId, groupId}: GetGroup) => { return serializeTargetGroup(streamId, topicId, groupId); }, deserialize: (r: CommandResponse) => { - return deserializeConsumerGroup(r.data); + return deserializeConsumerGroup(r.data).data; } }; + +export const getGroup = wrapCommand(GET_GROUP); diff --git a/src/wire/consumer-group/get-groups.command.ts b/src/wire/consumer-group/get-groups.command.ts index 68d4d32..283e558 100644 --- a/src/wire/consumer-group/get-groups.command.ts +++ b/src/wire/consumer-group/get-groups.command.ts @@ -1,5 +1,5 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; import { deserializeConsumerGroups } from './group.utils.js'; diff --git a/src/wire/consumer-group/join-group.command.ts b/src/wire/consumer-group/join-group.command.ts index b70bc60..f6e0047 100644 --- a/src/wire/consumer-group/join-group.command.ts +++ b/src/wire/consumer-group/join-group.command.ts @@ -1,16 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { type Id } from '../identifier.utils.js'; import { serializeTargetGroup } from './group.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type JoinGroup = { + streamId: Id, + topicId: Id, + groupId: Id +}; export const JOIN_GROUP = { code: 604, - serialize: (streamId: Id, topicId: Id, groupId: Id) => { + serialize: ({ streamId, topicId, groupId }: JoinGroup) => { return serializeTargetGroup(streamId, topicId, groupId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const joinGroup = wrapCommand(JOIN_GROUP); diff --git a/src/wire/consumer-group/leave-group.command.ts b/src/wire/consumer-group/leave-group.command.ts index ec46c49..18bc5fb 100644 --- a/src/wire/consumer-group/leave-group.command.ts +++ b/src/wire/consumer-group/leave-group.command.ts @@ -1,16 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { type Id } from '../identifier.utils.js'; import { serializeTargetGroup } from './group.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type LeaveGroup = { + streamId: Id, + topicId: Id, + groupId: Id +}; export const LEAVE_GROUP = { code: 605, - serialize: (streamId: Id, topicId: Id, groupId: Id) => { + serialize: ({streamId, topicId, groupId}: LeaveGroup) => { return serializeTargetGroup(streamId, topicId, groupId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const leaveGroup = wrapCommand(LEAVE_GROUP); diff --git a/src/wire/message/poll-messages.command.ts b/src/wire/message/poll-messages.command.ts index 2fede9f..bc04ea2 100644 --- a/src/wire/message/poll-messages.command.ts +++ b/src/wire/message/poll-messages.command.ts @@ -1,26 +1,30 @@ import type { Id } from '../identifier.utils.js'; -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; import type { Consumer } from '../offset/offset.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { - serializePollMessages, - deserializePollMessages, - type PollingStrategy + serializePollMessages, deserializePollMessages, + type PollingStrategy, type PollMessagesResponse } from './poll.utils.js'; +export type PollMessages = { + streamId: Id, + topicId: Id, + consumer: Consumer, + partitionId: number, + pollingStrategy: PollingStrategy, + count: number, + autocommit: boolean +}; + export const POLL_MESSAGES = { code: 100, - serialize: ( - streamId: Id, - topicId: Id, - consumer: Consumer, - partitionId: number, - pollingStrategy: PollingStrategy, - count: number, - autocommit: boolean, - ) => { + serialize: ({ + streamId, topicId, consumer, partitionId, pollingStrategy, count, autocommit + }: PollMessages) => { return serializePollMessages( streamId, topicId, consumer, partitionId, pollingStrategy, count, autocommit ); @@ -30,3 +34,5 @@ export const POLL_MESSAGES = { return deserializePollMessages(r.data); } }; + +export const pollMessages = wrapCommand(POLL_MESSAGES); diff --git a/src/wire/message/poll.utils.ts b/src/wire/message/poll.utils.ts index cac4b18..17d82cf 100644 --- a/src/wire/message/poll.utils.ts +++ b/src/wire/message/poll.utils.ts @@ -80,7 +80,7 @@ export const mapMessageState = (s: number): string => { return MessageState[s]; } -export type PollMessage = { +export type Message = { id: string, state: string, timestamp: Date, @@ -90,13 +90,20 @@ export type PollMessage = { checksum: number, }; +export type PollMessagesResponse = { + partitionId: number, + currentOffset: bigint, + messageCount: number, + messages: Message[] +}; + export const deserializePollMessages = (r: Buffer, pos = 0) => { const len = r.length; const partitionId = r.readUInt32LE(pos); const currentOffset = r.readBigUInt64LE(pos + 4); const messageCount = r.readUInt32LE(pos + 12); - const messages: PollMessage[] = []; + const messages: Message[] = []; pos += 16; if (pos >= len) { diff --git a/src/wire/message/send-messages.command.ts b/src/wire/message/send-messages.command.ts index ffa97e9..ff3ec20 100644 --- a/src/wire/message/send-messages.command.ts +++ b/src/wire/message/send-messages.command.ts @@ -1,23 +1,25 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { type Id } from '../identifier.utils.js'; import { serializeSendMessages, type CreateMessage } from './message.utils.js'; import type { Partitioning } from './partitioning.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; +export type SendMessages = { + streamId: Id, + topicId: Id, + messages: CreateMessage[], + partition?: Partitioning, +}; export const SEND_MESSAGES = { code: 101, - serialize: ( - streamId: Id, - topicId: Id, - messages: CreateMessage[], - partition?: Partitioning, - ) => { + serialize: ({ streamId, topicId, messages, partition }: SendMessages) => { return serializeSendMessages(streamId, topicId, messages, partition); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const sendMessages = wrapCommand(SEND_MESSAGES); diff --git a/src/wire/offset/get-offset.command.ts b/src/wire/offset/get-offset.command.ts index 7082369..7e89026 100644 --- a/src/wire/offset/get-offset.command.ts +++ b/src/wire/offset/get-offset.command.ts @@ -1,17 +1,21 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; import type { Id } from '../identifier.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeGetOffset, type Consumer, type OffsetResponse } from './offset.utils.js'; +export type GetOffset = { + streamId: Id, + topicId: Id, + consumer: Consumer, + partitionId?: number +}; + + export const GET_OFFSET = { code: 120, - serialize: ( - streamId: Id, - topicId: Id, - consumer: Consumer, - partitionId?: number - ) => { + serialize: ({streamId, topicId, consumer, partitionId = 1}: GetOffset) => { return serializeGetOffset(streamId, topicId, consumer, partitionId); }, @@ -27,3 +31,5 @@ export const GET_OFFSET = { } } }; + +export const getOffset = wrapCommand(GET_OFFSET); diff --git a/src/wire/offset/store-offset.command.ts b/src/wire/offset/store-offset.command.ts index 59ba001..9c8e680 100644 --- a/src/wire/offset/store-offset.command.ts +++ b/src/wire/offset/store-offset.command.ts @@ -1,24 +1,24 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { type Id } from '../identifier.utils.js'; import { serializeStoreOffset, type Consumer } from './offset.utils.js'; +export type StoreOffset = { + streamId: Id, + topicId: Id, + consumer: Consumer, + partitionId: number, + offset: bigint +}; + export const STORE_OFFSET = { code: 121, - serialize: ( - streamId: Id, - topicId: Id, - consumer: Consumer, - partitionId: number, - offset: bigint - ) => { - return serializeStoreOffset( - streamId, topicId, consumer, partitionId, offset - ); - }, + serialize: ({streamId, topicId, consumer, partitionId, offset}: StoreOffset) => + serializeStoreOffset(streamId, topicId, consumer, partitionId, offset), - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const storeOffset = wrapCommand(STORE_OFFSET); diff --git a/src/wire/partition/create-partition.command.ts b/src/wire/partition/create-partition.command.ts index 6c44cd2..0b5905d 100644 --- a/src/wire/partition/create-partition.command.ts +++ b/src/wire/partition/create-partition.command.ts @@ -1,14 +1,21 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; import type { Id } from '../identifier.utils.js'; import { serializePartitionParams } from './partition.utils.js'; +export type CreatePartition = { + streamId: Id, + topicId: Id, + partitionCount?: number +}; + export const CREATE_PARTITION = { code: 402, - serialize: (streamId: Id, topicId: Id, partitionCount = 1) => { + serialize: ({ streamId, topicId, partitionCount = 1 }: CreatePartition) => { return serializePartitionParams(streamId, topicId, partitionCount); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const createPartition = wrapCommand(CREATE_PARTITION); diff --git a/src/wire/partition/delete-partition.command.ts b/src/wire/partition/delete-partition.command.ts index bd0360b..096f799 100644 --- a/src/wire/partition/delete-partition.command.ts +++ b/src/wire/partition/delete-partition.command.ts @@ -1,19 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import type { Id } from '../identifier.utils.js'; import { serializePartitionParams } from './partition.utils.js'; - +export type DeletePartition = { + streamId: Id, + topicId: Id, + partitionCount: number +}; + export const DELETE_PARTITION = { code: 403, - serialize: ( - streamId: Id, - topicId: Id, - partitionCount: number, - ) => { + + serialize: ({ streamId, topicId, partitionCount }: DeletePartition) => { return serializePartitionParams(streamId, topicId, partitionCount); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + + deserialize: deserializeVoidResponse }; + +export const deletePartition = wrapCommand(DELETE_PARTITION); diff --git a/src/wire/session/login-with-token.command.test.ts b/src/wire/session/login-with-token.command.test.ts index 76c7244..ab19c5a 100644 --- a/src/wire/session/login-with-token.command.test.ts +++ b/src/wire/session/login-with-token.command.test.ts @@ -7,30 +7,30 @@ import { LOGIN_WITH_TOKEN } from './login-with-token.command.js'; describe("Login with token Command", () => { // @warn use ascii char to keep char.length === byteLength - const token = 'thisIsBigSecretToken123'; + const t1 = { token: 'thisIsBigSecretToken123' }; it("serialize token into a buffer", () => { assert.deepEqual( - LOGIN_WITH_TOKEN.serialize(token).length, - 1 + token.length + LOGIN_WITH_TOKEN.serialize(t1).length, + 1 + t1.token.length ); }); it("throw on empty token", () => { assert.throws( - () => LOGIN_WITH_TOKEN.serialize("") + () => LOGIN_WITH_TOKEN.serialize({token: ""}) ); }); it("throw on token > 255 bytes", () => { assert.throws( - () => LOGIN_WITH_TOKEN.serialize("YoLo".repeat(65)) + () => LOGIN_WITH_TOKEN.serialize({token: "YoLo".repeat(65)}) ); }); it("throw on login > 255 bytes - utf8 version", () => { assert.throws( - () => LOGIN_WITH_TOKEN.serialize("¥Ø£Ø".repeat(33)) + () => LOGIN_WITH_TOKEN.serialize({token: "¥Ø£Ø".repeat(33)}) ); }); diff --git a/src/wire/session/login-with-token.command.ts b/src/wire/session/login-with-token.command.ts index f5ceaa4..8e61b4d 100644 --- a/src/wire/session/login-with-token.command.ts +++ b/src/wire/session/login-with-token.command.ts @@ -1,10 +1,19 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; + +export type LoginWithToken = { + token: string +}; + +export type LoginResponse = { + userId: number +} export const LOGIN_WITH_TOKEN = { code: 44, - serialize: (token: string) => { + serialize: ({token}: LoginWithToken) => { const bToken = Buffer.from(token); if (bToken.length < 1 || bToken.length > 255) throw new Error('Token length should be between 1 and 255 bytes'); @@ -20,3 +29,5 @@ export const LOGIN_WITH_TOKEN = { userId: r.data.readUInt32LE(0) }) }; + +export const login = wrapCommand(LOGIN_WITH_TOKEN); diff --git a/src/wire/session/login.command.test.ts b/src/wire/session/login.command.test.ts index 4f56e97..ae2a85b 100644 --- a/src/wire/session/login.command.test.ts +++ b/src/wire/session/login.command.test.ts @@ -6,43 +6,51 @@ import { LOGIN } from './login.command.js'; describe("Login Command", () => { // @warn use ascii char to keep char.length === byteLength - const login = 'iggyYolo'; - const password = 'unitTestSeCret'; + + const l1 = { + username: 'iggyYolo', + password: 'unitTestSeCret' + }; it("serialize credentials into a buffer", () => { assert.deepEqual( - LOGIN.serialize(login, password).length, - 2 + login.length + password.length + LOGIN.serialize(l1).length, + 2 + l1.username.length + l1.password.length ); }); it("throw on empty login", () => { + const l2 = {...l1, username: ''}; assert.throws( - () => LOGIN.serialize("", password) + () => LOGIN.serialize(l2) ); }); it("throw on empty password", () => { + const l2 = {...l1, password: ''}; assert.throws( - () => LOGIN.serialize(login, "") + () => LOGIN.serialize(l2) ); }); it("throw on login > 255 bytes", () => { + const l2 = { ...l1, username: "YoLo".repeat(65)}; assert.throws( - () => LOGIN.serialize("YoLo".repeat(65), password) + () => LOGIN.serialize(l2) ); }); it("throw on login > 255 bytes - utf8 version", () => { + const l2 = { ...l1, username: "¥Ø£Ø".repeat(33)}; assert.throws( - () => LOGIN.serialize("¥Ø£Ø".repeat(33), password) + () => LOGIN.serialize(l2) ); }); it("throw on password > 255 bytes - utf8 version", () => { + const l2 = { ...l1, password: "¥Ø£Ø".repeat(33)}; assert.throws( - () => LOGIN.serialize(login, "¥Ø£Ø".repeat(33)) + () => LOGIN.serialize(l2) ); }); diff --git a/src/wire/session/login.command.ts b/src/wire/session/login.command.ts index dbd99d2..97ad0a9 100644 --- a/src/wire/session/login.command.ts +++ b/src/wire/session/login.command.ts @@ -1,11 +1,21 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; + +export type LoginCredentials = { + username: string, + password: string +} + +export type LoginResponse = { + userId: number +} // LOGIN export const LOGIN = { code: 38, - serialize: (username: string, password: string) => { + serialize: ({username, password}: LoginCredentials) => { const bUsername = Buffer.from(username); const bPassword = Buffer.from(password); @@ -32,3 +42,5 @@ export const LOGIN = { }) }; + +export const login = wrapCommand(LOGIN); diff --git a/src/wire/session/logout.command.ts b/src/wire/session/logout.command.ts index 7f929e7..200750f 100644 --- a/src/wire/session/logout.command.ts +++ b/src/wire/session/logout.command.ts @@ -1,5 +1,6 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; // LOGOUT export const LOGOUT = { @@ -8,8 +9,8 @@ export const LOGOUT = { return Buffer.alloc(0); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } - + deserialize: deserializeVoidResponse }; + + +export const logout = wrapCommand(LOGOUT); diff --git a/src/wire/stream/create-stream.command.test.ts b/src/wire/stream/create-stream.command.test.ts index 7102a4d..6748831 100644 --- a/src/wire/stream/create-stream.command.test.ts +++ b/src/wire/stream/create-stream.command.test.ts @@ -7,29 +7,36 @@ describe('CreateStream', () => { describe('serialize', () => { + const s1 = { + streamId: 1, + name: 'test-stream' + }; + it('serialize 1 numeric id & 1 name into buffer', () => { - const name = 'test-stream'; assert.deepEqual( - CREATE_STREAM.serialize(1, name).length, - 4 + 1 + name.length + CREATE_STREAM.serialize(s1).length, + 4 + 1 + s1.name.length ); }); it('throw on name < 1', () => { + const s2 = {...s1, name: ''}; assert.throws( - () => CREATE_STREAM.serialize(1, '') + () => CREATE_STREAM.serialize(s2) ); }); it("throw on name > 255 bytes", () => { + const s2 = { ...s1, name: "YoLo".repeat(65)}; assert.throws( - () => CREATE_STREAM.serialize(1, "YoLo".repeat(65)) + () => CREATE_STREAM.serialize(s2) ); }); it("throw on name > 255 bytes - utf8 version", () => { + const s2 = { ...s1, name: "¥Ø£Ø".repeat(33)}; assert.throws( - () => CREATE_STREAM.serialize(3, "¥Ø£Ø".repeat(33)) + () => CREATE_STREAM.serialize(s2) ); }); diff --git a/src/wire/stream/create-stream.command.ts b/src/wire/stream/create-stream.command.ts index e8d8660..f1f9b3c 100644 --- a/src/wire/stream/create-stream.command.ts +++ b/src/wire/stream/create-stream.command.ts @@ -1,17 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type CreateStream = { + streamId: number, + name: string +}; export const CREATE_STREAM = { code: 202, - serialize: (id: number, name: string) => { + serialize: ({streamId, name}: CreateStream) => { const bName = Buffer.from(name); if (bName.length < 1 || bName.length > 255) throw new Error('Stream name should be between 1 and 255 bytes'); const b = Buffer.allocUnsafe(4 + 1); - b.writeUInt32LE(id, 0); + b.writeUInt32LE(streamId, 0); b.writeUInt8(bName.length, 4); return Buffer.concat([ @@ -20,7 +26,7 @@ export const CREATE_STREAM = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const createStream = wrapCommand(CREATE_STREAM); diff --git a/src/wire/stream/delete-stream.command.ts b/src/wire/stream/delete-stream.command.ts index e08c053..2778f03 100644 --- a/src/wire/stream/delete-stream.command.ts +++ b/src/wire/stream/delete-stream.command.ts @@ -1,15 +1,20 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +export type DeleteStream = { + streamId: Id +}; + export const DELETE_STREAM = { code: 203, - serialize: (streamId: Id) => { + serialize: ({streamId}: DeleteStream) => { return serializeIdentifier(streamId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const deleteStream = wrapCommand(DELETE_STREAM); diff --git a/src/wire/stream/get-stream.command.ts b/src/wire/stream/get-stream.command.ts index 7852b9a..b05efcb 100644 --- a/src/wire/stream/get-stream.command.ts +++ b/src/wire/stream/get-stream.command.ts @@ -1,17 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; -import { deserializeToStream } from './stream.utils.js'; +import { deserializeToStream, type Stream } from './stream.utils.js'; +export type GetStream = { + streamId: Id +}; export const GET_STREAM = { code: 200, - serialize: (id: Id) => { - return serializeIdentifier(id); + serialize: ({ streamId }: GetStream) => { + return serializeIdentifier(streamId); }, deserialize: (r: CommandResponse) => { return deserializeToStream(r.data, 0).data } } + +export const getStream = wrapCommand(GET_STREAM); diff --git a/src/wire/stream/get-streams.command.ts b/src/wire/stream/get-streams.command.ts index 8df1df6..2e99152 100644 --- a/src/wire/stream/get-streams.command.ts +++ b/src/wire/stream/get-streams.command.ts @@ -1,6 +1,7 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeToStream } from './stream.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeToStream, type Stream } from './stream.utils.js'; export const GET_STREAMS = { code: 201, @@ -17,3 +18,5 @@ export const GET_STREAMS = { return streams; } }; + +export const getStreams = wrapCommand(GET_STREAMS); diff --git a/src/wire/stream/purge-stream.command.ts b/src/wire/stream/purge-stream.command.ts index 50ffcbd..8164bd1 100644 --- a/src/wire/stream/purge-stream.command.ts +++ b/src/wire/stream/purge-stream.command.ts @@ -1,15 +1,20 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +export type PurgeStream = { + streamId: Id +}; + export const PURGE_STREAM = { code: 205, - serialize: (streamId: Id) => { + serialize: ({streamId}: PurgeStream) => { return serializeIdentifier(streamId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const purgeStream = wrapCommand(PURGE_STREAM); diff --git a/src/wire/stream/update-stream.command.ts b/src/wire/stream/update-stream.command.ts index 34a249a..42c9747 100644 --- a/src/wire/stream/update-stream.command.ts +++ b/src/wire/stream/update-stream.command.ts @@ -1,19 +1,24 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; import { uint8ToBuf } from '../number.utils.js'; +export type UpdateStream = { + streamId: Id, + name: string +} + export const UPDATE_STREAM = { code: 204, - serialize: (id: Id, name: string) => { - const bId = serializeIdentifier(id); + serialize: ({streamId, name}: UpdateStream) => { + const bId = serializeIdentifier(streamId); const bName = Buffer.from(name); if (bName.length < 1 || bName.length > 255) throw new Error('Stream name should be between 1 and 255 bytes'); - return Buffer.concat([ bId, uint8ToBuf(bName.length), @@ -21,7 +26,7 @@ export const UPDATE_STREAM = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const updateStream = wrapCommand(UPDATE_STREAM); diff --git a/src/wire/system/get-stats.command.ts b/src/wire/system/get-stats.command.ts index 776e7d4..9ed7d01 100644 --- a/src/wire/system/get-stats.command.ts +++ b/src/wire/system/get-stats.command.ts @@ -1,5 +1,6 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; export type Stats = { processId: number, @@ -14,59 +15,42 @@ export type Stats = { messagesSizeBytes: bigint, streamsCount: number, topicsCount: number, - partitionCount: number, + partitionsCount: number, + segmentsCount: number, messagesCount: bigint, clientsCount: number, - consumerGroupsCount: number, + consumersGroupsCount: number, hostname: string, osName: string, osVersion: string, kernelVersion: string } -const statsPos = { - processIDPos: 0, - cpuUsagePos: 4, - memoryUsagePos: 8, - totalMemoryPos: 16, - availableMemoryPos: 24, - runTimePos: 32, - startTimePos: 40, - readBytesPos: 48, - writtenBytesPos: 56, - messagesSizeBytesPos: 64, - streamsCountPos: 72, - topicsCountPos: 76, - partitionsCountPos: 80, - segmentsCountPos: 84, - messagesCountPos: 88, - clientsCountPos: 96, - consumerGroupsCountPos: 100 -}; export const GET_STATS = { code: 10, serialize: () => Buffer.alloc(0), deserialize: (r: CommandResponse): Stats => { - const processId = r.data.readUInt32LE(statsPos.processIDPos); - const cpuUsage = r.data.readFloatLE(statsPos.cpuUsagePos); - const memoryUsage = r.data.readBigUInt64LE(statsPos.memoryUsagePos); - const totalMemory = r.data.readBigUInt64LE(statsPos.totalMemoryPos); - const availableMemory = r.data.readBigUInt64LE(statsPos.availableMemoryPos); - const runTime = r.data.readBigUInt64LE(statsPos.runTimePos); - const startTime = r.data.readBigUInt64LE(statsPos.startTimePos); - const readBytes = r.data.readBigUInt64LE(statsPos.readBytesPos); - const writtenBytes = r.data.readBigUInt64LE(statsPos.writtenBytesPos); - const messagesSizeBytes = r.data.readBigUInt64LE(statsPos.messagesSizeBytesPos); - const streamsCount = r.data.readUInt32LE(statsPos.streamsCountPos); - const topicsCount = r.data.readUInt32LE(statsPos.topicsCountPos); - const partitionCount = r.data.readUInt32LE(statsPos.partitionsCountPos); - const messagesCount = r.data.readBigUInt64LE(statsPos.messagesCountPos); - const clientsCount = r.data.readUInt32LE(statsPos.clientsCountPos); - const consumerGroupsCount = r.data.readUInt32LE(statsPos.consumerGroupsCountPos); + const processId = r.data.readUInt32LE(0); + const cpuUsage = r.data.readFloatLE(4); + const memoryUsage = r.data.readBigUInt64LE(8); + const totalMemory = r.data.readBigUInt64LE(16); + const availableMemory = r.data.readBigUInt64LE(24); + const runTime = r.data.readBigUInt64LE(32); + const startTime = r.data.readBigUInt64LE(40); + const readBytes = r.data.readBigUInt64LE(48); + const writtenBytes = r.data.readBigUInt64LE(56); + const messagesSizeBytes = r.data.readBigUInt64LE(64); + const streamsCount = r.data.readUInt32LE(72); + const topicsCount = r.data.readUInt32LE(76); + const partitionsCount = r.data.readUInt32LE(80); + const segmentsCount = r.data.readUInt32LE(84); + const messagesCount = r.data.readBigUInt64LE(88); + const clientsCount = r.data.readUInt32LE(96); + const consumersGroupsCount = r.data.readUInt32LE(100); - let position = statsPos.consumerGroupsCountPos + 4; + let position = 100 + 4; const hostnameLength = r.data.readUInt32LE(position); const hostname = r.data.subarray( position + 4, @@ -107,10 +91,11 @@ export const GET_STATS = { messagesSizeBytes, streamsCount, topicsCount, - partitionCount, + partitionsCount, + segmentsCount, messagesCount, clientsCount, - consumerGroupsCount, + consumersGroupsCount, hostname, osName, osVersion, @@ -118,3 +103,5 @@ export const GET_STATS = { }; } }; + +export const getStats = wrapCommand(GET_STATS); diff --git a/src/wire/system/ping.command.ts b/src/wire/system/ping.command.ts index 11d62a8..4b94763 100644 --- a/src/wire/system/ping.command.ts +++ b/src/wire/system/ping.command.ts @@ -1,5 +1,6 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; // PING export const PING = { @@ -8,8 +9,8 @@ export const PING = { return Buffer.alloc(0); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const ping = wrapCommand(PING); diff --git a/src/wire/token/create-token.command.test.ts b/src/wire/token/create-token.command.test.ts index 264c532..de722cf 100644 --- a/src/wire/token/create-token.command.test.ts +++ b/src/wire/token/create-token.command.test.ts @@ -7,32 +7,37 @@ describe('CreateToken', () => { describe('serialize', () => { - const name = 'test-token'; - const expiry = 1234; + const t1 = { + name: 'test-token', + expiry: 1234 + }; it('serialize 1 name & 1 uint32 into buffer', () => { assert.deepEqual( - CREATE_TOKEN.serialize(name, expiry).length, - 4 + 1 + name.length + CREATE_TOKEN.serialize(t1).length, + 4 + 1 + t1.name.length ); }); it('throw on name < 1', () => { + const t2 = { ...t1, name: ''}; assert.throws( - () => CREATE_TOKEN.serialize('', expiry) + () => CREATE_TOKEN.serialize(t2) ); }); it("throw on name > 255 bytes", () => { + const t2 = { ...t1, name: "YoLo".repeat(65)}; assert.throws( - () => CREATE_TOKEN.serialize("YoLo".repeat(65), expiry) + () => CREATE_TOKEN.serialize(t2) ); }); it("throw on name > 255 bytes - utf8 version", () => { + const t2 = { ...t1, name: "¥Ø£Ø".repeat(33)}; assert.throws( - () => CREATE_TOKEN.serialize("¥Ø£Ø".repeat(33), expiry) + () => CREATE_TOKEN.serialize(t2) ); }); diff --git a/src/wire/token/create-token.command.ts b/src/wire/token/create-token.command.ts index 8dba408..7950b6b 100644 --- a/src/wire/token/create-token.command.ts +++ b/src/wire/token/create-token.command.ts @@ -1,11 +1,18 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeCreateToken } from './token.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { deserializeCreateToken, type CreateTokenResponse } from './token.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type CreateToken = { + name: string, + expiry?: number +} + export const CREATE_TOKEN = { code: 42, - serialize: (name: string, expiry = 600): Buffer => { + serialize: ({name, expiry = 600}: CreateToken): Buffer => { const bName = Buffer.from(name); if (bName.length < 1 || bName.length > 255) throw new Error('Token name should be between 1 and 255 bytes'); @@ -23,3 +30,4 @@ export const CREATE_TOKEN = { deserialize: (r: CommandResponse) => deserializeCreateToken(r.data).data }; +export const createToken = wrapCommand(CREATE_TOKEN); diff --git a/src/wire/token/delete-token.command.test.ts b/src/wire/token/delete-token.command.test.ts index d5bdfad..1dd881a 100644 --- a/src/wire/token/delete-token.command.test.ts +++ b/src/wire/token/delete-token.command.test.ts @@ -7,31 +7,34 @@ describe('DeleteToken', () => { describe('serialize', () => { - const name = 'test-token'; + const t1 = {name : 'test-token'}; it('serialize 1 name into buffer', () => { assert.deepEqual( - DELETE_TOKEN.serialize(name).length, - 1 + name.length + DELETE_TOKEN.serialize(t1).length, + 1 + t1.name.length ); }); it('throw on name < 1', () => { + const t2 = { ...t1, name: ''}; assert.throws( - () => DELETE_TOKEN.serialize('') + () => DELETE_TOKEN.serialize(t2) ); }); it("throw on name > 255 bytes", () => { + const t2 = { ...t1, name: "YoLo".repeat(65)}; assert.throws( - () => DELETE_TOKEN.serialize("YoLo".repeat(65)) + () => DELETE_TOKEN.serialize(t2) ); }); it("throw on name > 255 bytes - utf8 version", () => { + const t2 = { ...t1, name: "¥Ø£Ø".repeat(33)}; assert.throws( - () => DELETE_TOKEN.serialize("¥Ø£Ø".repeat(33)) + () => DELETE_TOKEN.serialize(t2) ); }); diff --git a/src/wire/token/delete-token.command.ts b/src/wire/token/delete-token.command.ts index 96ac754..bb1fcc4 100644 --- a/src/wire/token/delete-token.command.ts +++ b/src/wire/token/delete-token.command.ts @@ -1,10 +1,15 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type DeleteToken = { + name: string +}; export const DELETE_TOKEN = { code: 43, - serialize: (name: string): Buffer => { + serialize: ({name}: DeleteToken): Buffer => { const bName = Buffer.from(name); if (bName.length < 1 || bName.length > 255) throw new Error('Token name should be between 1 and 255 bytes'); @@ -16,7 +21,7 @@ export const DELETE_TOKEN = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const deleteToken = wrapCommand(DELETE_TOKEN); diff --git a/src/wire/token/get-tokens.command.ts b/src/wire/token/get-tokens.command.ts index b682949..ac509f0 100644 --- a/src/wire/token/get-tokens.command.ts +++ b/src/wire/token/get-tokens.command.ts @@ -1,7 +1,8 @@ +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeTokens, type Token } from './token.utils.js'; -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeTokens } from './token.utils.js'; export const GET_TOKENS = { code: 41, @@ -10,3 +11,5 @@ export const GET_TOKENS = { deserialize: (r: CommandResponse) => deserializeTokens(r.data) }; + +export const getTokens = wrapCommand(GET_TOKENS); diff --git a/src/wire/token/token.utils.ts b/src/wire/token/token.utils.ts index ca87939..5f4f2cd 100644 --- a/src/wire/token/token.utils.ts +++ b/src/wire/token/token.utils.ts @@ -1,13 +1,13 @@ import { toDate } from '../serialize.utils.js'; -export type CreateToken = { +export type CreateTokenResponse = { token: string }; type TokenDeserialized = { bytesRead: number, - data: CreateToken + data: CreateTokenResponse }; export type Token = { diff --git a/src/wire/topic/create-topic.command.test.ts b/src/wire/topic/create-topic.command.test.ts index f24de42..1fc32e4 100644 --- a/src/wire/topic/create-topic.command.test.ts +++ b/src/wire/topic/create-topic.command.test.ts @@ -6,52 +6,56 @@ import { CREATE_TOPIC } from './create-topic.command.js'; describe('CreateTopic', () => { describe('serialize', () => { - // serialize: ( - // streamId: Id, - // topicId: number, - // name: string, - // partitionCount: number, - // messageExpiry = 0, - // maxTopicSize = 0, - // replicationFactor = 1 - // ) => { ... } - - const name = 'test-topic'; + + const t1 = { + streamId: 1, + topicId: 2, + name: 'test-topic', + partitionCount: 1, + messageExpiry: 0, + maxTopicSize: 0, + replicationFactor: 1 + }; it('serialize 1 numeric id & 1 name into buffer', () => { assert.deepEqual( - CREATE_TOPIC.serialize(1, 2, name, 1, 0, 0, 1).length, - 6 + 4 + 4 + 4 + 8 + 1 + 1 + name.length + CREATE_TOPIC.serialize(t1).length, + 6 + 4 + 4 + 4 + 8 + 1 + 1 + t1.name.length ); }); it('throw on name < 1', () => { + const t = { ...t1, name: '' }; assert.throws( - () => CREATE_TOPIC.serialize(1, 2, '', 1) + () => CREATE_TOPIC.serialize(t) ); }); it("throw on name > 255 bytes", () => { + const t = { ...t1, name: "YoLo".repeat(65)}; assert.throws( - () => CREATE_TOPIC.serialize(1, 2, "YoLo".repeat(65), 1) + () => CREATE_TOPIC.serialize(t) ); }); it("throw on name > 255 bytes - utf8 version", () => { + const t = { ...t1, name: "¥Ø£Ø".repeat(33) }; assert.throws( - () => CREATE_TOPIC.serialize(1, 3, "¥Ø£Ø".repeat(33), 2) + () => CREATE_TOPIC.serialize(t) ); }); it('throw on replication_factor < 1', () => { + const t = { ...t1, replicationFactor: 0 }; assert.throws( - () => CREATE_TOPIC.serialize(1, 2, name, 1, 0, 0, 0), + () => CREATE_TOPIC.serialize(t), ); }); it('throw on replication_factor > 255', () => { + const t = { ...t1, replicationFactor: 257 }; assert.throws( - () => CREATE_TOPIC.serialize(1, 2, name, 1, 0, 0, 256), + () => CREATE_TOPIC.serialize(t), ); }); diff --git a/src/wire/topic/create-topic.command.ts b/src/wire/topic/create-topic.command.ts index 81a4376..fcbbd8f 100644 --- a/src/wire/topic/create-topic.command.ts +++ b/src/wire/topic/create-topic.command.ts @@ -1,51 +1,59 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; - -// export type CreateTopic = { -// streamId: number | string, -// topicId: number, -// name: string, -// partitionCount: number, -// messageExpiry: number -// }; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type CreateTopic = { + streamId: Id, + topicId: number, + name: string, + partitionCount: number, + messageExpiry?: number, + maxTopicSize?: number, + replicationFactor?: number +}; export const CREATE_TOPIC = { code: 302, - - serialize: ( - streamId: Id, - topicId: number, - name: string, - partitionCount: number, + serialize: ({ + streamId, + topicId, + name, + partitionCount, messageExpiry = 0, maxTopicSize = 0, replicationFactor = 1 + }: CreateTopic ) => { const streamIdentifier = serializeIdentifier(streamId); const bName = Buffer.from(name) - + if (replicationFactor < 1 || replicationFactor > 255) throw new Error('Topic replication factor should be between 1 and 255'); if (bName.length < 1 || bName.length > 255) throw new Error('Topic name should be between 1 and 255 bytes'); - + const b = Buffer.allocUnsafe(4 + 4 + 4 + 8 + 1 + 1); b.writeUInt32LE(topicId, 0); b.writeUInt32LE(partitionCount, 4); - b.writeUInt32LE(messageExpiry, 8); // 0 is unlimited ??? + b.writeUInt32LE(messageExpiry, 8); // 0 is unlimited b.writeBigUInt64LE(BigInt(maxTopicSize), 12); // optional, 0 is null b.writeUInt8(replicationFactor, 20); // must be > 0 b.writeUInt8(bName.length, 21); - + return Buffer.concat([ streamIdentifier, b, bName, ]); }, - - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const createTopic = wrapCommand(CREATE_TOPIC); + +// export const createTopic = (cli: Client) => async (arg: CreateTopic) => { +// return CREATE_TOPIC.deserialize( +// await cli.sendCommand(CREATE_TOPIC.code, CREATE_TOPIC.serialize(arg)) +// ); +// } diff --git a/src/wire/topic/delete-topic.command.ts b/src/wire/topic/delete-topic.command.ts index de6e4e1..2b45a2c 100644 --- a/src/wire/topic/delete-topic.command.ts +++ b/src/wire/topic/delete-topic.command.ts @@ -1,12 +1,19 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; import { uint32ToBuf } from '../number.utils.js'; +type DeleteTopic = { + streamId: Id, + topicId: Id, + partitionsCount: number +} + export const DELETE_TOPIC = { code: 303, - serialize: (streamId: Id, topicId: Id, partitionsCount: number) => { + serialize: ({streamId, topicId, partitionsCount}: DeleteTopic) => { return Buffer.concat([ serializeIdentifier(streamId), serializeIdentifier(topicId), @@ -14,7 +21,7 @@ export const DELETE_TOPIC = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const deleteTopic = wrapCommand(DELETE_TOPIC); diff --git a/src/wire/topic/get-topic.command.ts b/src/wire/topic/get-topic.command.ts index 7041a56..7935e69 100644 --- a/src/wire/topic/get-topic.command.ts +++ b/src/wire/topic/get-topic.command.ts @@ -1,13 +1,18 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; -import { deserializeTopic } from './topic.utils.js'; +import { deserializeTopic, type Topic } from './topic.utils.js'; +type GetTopic = { + streamId: Id, + topicId: Id +} export const GET_TOPIC = { code: 300, - serialize: (streamId: Id, topicId: Id) => { + serialize: ({streamId, topicId}: GetTopic) => { return Buffer.concat([ serializeIdentifier(streamId), serializeIdentifier(topicId) @@ -18,3 +23,5 @@ export const GET_TOPIC = { return deserializeTopic(r.data).data; } }; + +export const getTopic = wrapCommand(GET_TOPIC); diff --git a/src/wire/topic/get-topics.command.ts b/src/wire/topic/get-topics.command.ts index 4d1cf3b..63e604d 100644 --- a/src/wire/topic/get-topics.command.ts +++ b/src/wire/topic/get-topics.command.ts @@ -1,12 +1,17 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; -import { deserializeTopics } from './topic.utils.js'; +import { deserializeTopics, type Topic } from './topic.utils.js'; + +export type GetTopics = { + streamId: Id +}; export const GET_TOPICS = { code: 301, - serialize: (streamId: Id) => { + serialize: ({streamId}: GetTopics) => { return serializeIdentifier(streamId); }, @@ -14,3 +19,5 @@ export const GET_TOPICS = { return deserializeTopics(r.data); } }; + +export const getTopics = wrapCommand(GET_TOPICS); diff --git a/src/wire/topic/purge-topic.command.ts b/src/wire/topic/purge-topic.command.ts index 98eb722..875f1f4 100644 --- a/src/wire/topic/purge-topic.command.ts +++ b/src/wire/topic/purge-topic.command.ts @@ -1,18 +1,25 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; + +export type PurgeTopic = { + streamId: Id, + topicId: Id +}; export const PURGE_TOPIC = { code: 305, - serialize: (streamId: Id, topicId: Id) => { + serialize: ({ streamId, topicId }: PurgeTopic) => { return Buffer.concat([ serializeIdentifier(streamId), serializeIdentifier(topicId), ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const purgeTopic = wrapCommand(PURGE_TOPIC); + diff --git a/src/wire/topic/topic.utils.ts b/src/wire/topic/topic.utils.ts index 07ab304..099ce11 100644 --- a/src/wire/topic/topic.utils.ts +++ b/src/wire/topic/topic.utils.ts @@ -46,13 +46,6 @@ export const deserializeBaseTopic = (p: Buffer, pos = 0): BaseTopicSerialized => const nameLength = p.readUInt8(pos + 45); const name = p.subarray(pos + 46, pos + 46 + nameLength).toString(); - // @WTF ? - // nameEnd := position + 37 + nameLength - // if nameEnd > len(payload) { - // return TopicResponse{}, 0, json.Unmarshal([]byte(`{}`), &topic) - // } - // topic.Name = string(bytes.Trim(payload[position+37:nameEnd], "\x00")) - return { bytesRead: 4 + 8 + 4 + 4 + 8 + 1 + 8 + 8 + 1 + nameLength, data: { diff --git a/src/wire/topic/update-topic.command.ts b/src/wire/topic/update-topic.command.ts index b62ebbd..08b7227 100644 --- a/src/wire/topic/update-topic.command.ts +++ b/src/wire/topic/update-topic.command.ts @@ -1,27 +1,29 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; -// export type CreateTopic = { -// streamId: number | string, -// topicId: number, -// name: string, -// partitionCount: number, -// messageExpiry: number -// }; + +export type UpdateTopic = { + streamId: Id, + topicId: Id, + name: string, + messageExpiry?: number, + maxTopicSize?: number, + replicationFactor?: number, +}; export const UPDATE_TOPIC = { code: 304, - serialize: ( - streamId: Id, - topicId: Id, - name: string, + serialize: ({ + streamId, + topicId, + name, messageExpiry = 0, maxTopicSize = 0, replicationFactor = 1, - - ) => { + }: UpdateTopic) => { const streamIdentifier = serializeIdentifier(streamId); const topicIdentifier = serializeIdentifier(topicId); const bName = Buffer.from(name) @@ -43,7 +45,7 @@ export const UPDATE_TOPIC = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const updateTopic = wrapCommand(UPDATE_TOPIC); diff --git a/src/wire/user/change-password.command.ts b/src/wire/user/change-password.command.ts index 8bff169..b16ff3f 100644 --- a/src/wire/user/change-password.command.ts +++ b/src/wire/user/change-password.command.ts @@ -1,25 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; import { uint8ToBuf } from '../number.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; -// export type ChangePassword = { -// id: number, -// currentPassword: string, -// newPassword: string, -// }; + +export type ChangePassword = { + userId: number, + currentPassword: string, + newPassword: string +}; export const CHANGE_PASSWORD = { code: 37, - serialize: ( - id: Id, - currentPassword: string, - newPassword: string, - ) => { + serialize: ({ userId, currentPassword, newPassword }: ChangePassword) => { - const bId = serializeIdentifier(id); + const bId = serializeIdentifier(userId); const bCur = Buffer.from(currentPassword); const bNew = Buffer.from(newPassword); @@ -38,7 +36,7 @@ export const CHANGE_PASSWORD = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const changePassword = wrapCommand(CHANGE_PASSWORD); diff --git a/src/wire/user/create-user.command.test.ts b/src/wire/user/create-user.command.test.ts index badf9d8..fc40c0e 100644 --- a/src/wire/user/create-user.command.test.ts +++ b/src/wire/user/create-user.command.test.ts @@ -6,58 +6,61 @@ import { CREATE_USER } from './create-user.command.js'; describe('CreateUser', () => { describe('serialize', () => { - // serialize: ( - // userusername: string, - // password: string, - // status: UserStatus, - // permissions?: UserPermissions - // ) => { - const username = 'test-user'; - const password = 'test-pwd'; - const status = 1; // Active; - const perms = undefined; // @TODO + const u1 = { + id: 1, + username: 'test-user', + password: 'test-pwd', + status: 1, // Active, + // perms: undefined // @TODO + }; it('serialize username, password, status, permissions into buffer', () => { assert.deepEqual( - CREATE_USER.serialize(username, password, status, perms).length, - 1 + username.length + 1 + password.length + 1 + 1 + 4 + 1 + CREATE_USER.serialize(u1).length, + 1 + u1.username.length + 1 + u1.password.length + 1 + 1 + 4 + 1 ); }); it('throw on username < 1', () => { + const u2 = { ...u1, username: '' }; assert.throws( - () => CREATE_USER.serialize('', password, status, perms) + () => CREATE_USER.serialize(u2) ); }); it('throw on username > 255 bytes', () => { + const u2 = { ...u1, username: "YoLo".repeat(65) }; assert.throws( - () => CREATE_USER.serialize('YoLo'.repeat(65), password, status, perms) + () => CREATE_USER.serialize(u2) ); }); it('throw on username > 255 bytes - utf8 version', () => { + const u2 = { ...u1, username: "¥Ø£Ø".repeat(33) }; assert.throws( - () => CREATE_USER.serialize('¥Ø£Ø'.repeat(33), password, status, perms) + () => CREATE_USER.serialize(u2) ); }); it('throw on password < 1', () => { + const u2 = { ...u1, password: '' }; assert.throws( - () => CREATE_USER.serialize('', password, status, perms) + () => CREATE_USER.serialize(u2) ); }); it('throw on password > 255 bytes', () => { + const u2 = { ...u1, password: "yolo".repeat(65) }; assert.throws( - () => CREATE_USER.serialize(username, 'YoLo'.repeat(65), status, perms) + () => CREATE_USER.serialize(u2) ); }); it('throw on password > 255 bytes - utf8 version', () => { + const u2 = { ...u1, password: "¥Ø£Ø".repeat(33) }; assert.throws( - () => CREATE_USER.serialize(username, '¥Ø£Ø'.repeat(33), status, perms) + () => CREATE_USER.serialize(u2) ); }); diff --git a/src/wire/user/create-user.command.ts b/src/wire/user/create-user.command.ts index 20b20ec..86702c4 100644 --- a/src/wire/user/create-user.command.ts +++ b/src/wire/user/create-user.command.ts @@ -1,26 +1,21 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import type { UserStatus } from './user.utils.js'; import { uint8ToBuf, uint32ToBuf, boolToBuf } from '../number.utils.js'; import { serializePermissions, type UserPermissions } from './permissions.utils.js'; -// export type CreateUser = { -// id: number, -// username: string, -// password: string, -// status: UserStatus -// permissions: UserPermissions -// }; +export type CreateUser = { + username: string, + password: string, + status: UserStatus + permissions?: UserPermissions +}; export const CREATE_USER = { code: 33, - serialize: ( - username: string, - password: string, - status: UserStatus, - permissions?: UserPermissions - ) => { + serialize: ({ username, password, status, permissions }: CreateUser) => { const bUsername = Buffer.from(username); const bPassword = Buffer.from(password); @@ -46,7 +41,7 @@ export const CREATE_USER = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const createUser = wrapCommand(CREATE_USER); diff --git a/src/wire/user/delete-user.command.ts b/src/wire/user/delete-user.command.ts index f7aed4b..f364150 100644 --- a/src/wire/user/delete-user.command.ts +++ b/src/wire/user/delete-user.command.ts @@ -1,15 +1,20 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { wrapCommand } from '../command.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; +export type DeleteUser = { + userId: Id +} + export const DELETE_USER = { code: 34, - serialize: (userId: Id) => { + serialize: ({userId}: DeleteUser) => { return serializeIdentifier(userId); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const deleteUser = wrapCommand(DELETE_USER); diff --git a/src/wire/user/get-user.command.ts b/src/wire/user/get-user.command.ts index 3b60fc1..8b87204 100644 --- a/src/wire/user/get-user.command.ts +++ b/src/wire/user/get-user.command.ts @@ -1,15 +1,23 @@ -import { serializeIdentifier } from '../identifier.utils.js'; -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeUser } from './user.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; +import { serializeIdentifier, type Id } from '../identifier.utils.js'; +import { deserializeUser, type User } from './user.utils.js'; + + +export type GetUser = { + userId: Id +}; // GET USER by id export const GET_USER = { code: 31, - serialize: (id: string | number) => { - return serializeIdentifier(id); + serialize: ({userId}: GetUser) => { + return serializeIdentifier(userId); }, deserialize: (r: CommandResponse) => deserializeUser(r.data) }; + +export const getUser = wrapCommand(GET_USER); diff --git a/src/wire/user/get-users.command.ts b/src/wire/user/get-users.command.ts index 06fc7a2..4203df4 100644 --- a/src/wire/user/get-users.command.ts +++ b/src/wire/user/get-users.command.ts @@ -1,15 +1,16 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { deserializeUsers } from './user.utils.js'; +import type { CommandResponse } from '../../client/client.type.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeUsers, type BaseUser } from './user.utils.js'; // GET USERS export const GET_USERS = { code: 32, - serialize: () => { - return Buffer.alloc(0); - // return serializeIdentifier(id); - }, - deserialize: (r: CommandResponse) => deserializeUsers(r.data) + serialize: () => Buffer.alloc(0), + + deserialize: (r: CommandResponse) => deserializeUsers(r.data) }; + +export const getUsers = wrapCommand(GET_USERS); diff --git a/src/wire/user/permissions.utils.test.ts b/src/wire/user/permissions.utils.test.ts index 1f26f26..f3f555a 100644 --- a/src/wire/user/permissions.utils.test.ts +++ b/src/wire/user/permissions.utils.test.ts @@ -1,5 +1,4 @@ - import { describe, it } from 'node:test'; import assert from 'node:assert/strict'; import { serializePermissions, deserializePermissions } from './permissions.utils.js'; diff --git a/src/wire/user/update-permissions.command.ts b/src/wire/user/update-permissions.command.ts index acc2df5..e9057da 100644 --- a/src/wire/user/update-permissions.command.ts +++ b/src/wire/user/update-permissions.command.ts @@ -1,33 +1,31 @@ -import type { CommandResponse } from '../../tcp.client.js'; -import { uint8ToBuf, uint32ToBuf, boolToBuf } from '../number.utils.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; +import { uint32ToBuf, boolToBuf } from '../number.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; import { serializePermissions, type UserPermissions } from './permissions.utils.js'; -// export type UpdatePermissions = { -// id: number, -// permissions: UserPermissions -// }; +export type UpdatePermissions = { + userId: Id, + permissions: UserPermissions +}; export const UPDATE_PERMISSIONS = { code: 36, - serialize: ( - id: Id, - permissions?: UserPermissions - ) => { + serialize: ({ userId, permissions}: UpdatePermissions ) => { const bPermissions = serializePermissions(permissions); return Buffer.concat([ - serializeIdentifier(id), + serializeIdentifier(userId), boolToBuf(!!permissions), uint32ToBuf(bPermissions.length), bPermissions ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const updatePermissions = wrapCommand(UPDATE_PERMISSIONS); diff --git a/src/wire/user/update-user.command.ts b/src/wire/user/update-user.command.ts index 619fd37..b7265c7 100644 --- a/src/wire/user/update-user.command.ts +++ b/src/wire/user/update-user.command.ts @@ -1,26 +1,23 @@ -import type { CommandResponse } from '../../tcp.client.js'; +import { wrapCommand } from '../command.utils.js'; +import { deserializeVoidResponse } from '../../client/client.utils.js'; import type { UserStatus } from './user.utils.js'; import { serializeIdentifier, type Id } from '../identifier.utils.js'; import { uint8ToBuf } from '../number.utils.js'; -// export type UpdateUser = { -// id: number, -// username?: string, -// status?: UserStatus -// }; +export type UpdateUser = { + userId: Id, + username?: string, + status?: UserStatus +}; export const UPDATE_USER = { code: 35, - serialize: ( - id: Id, - username?: string, - status?: UserStatus, - ) => { + serialize: ({userId, username, status}: UpdateUser) => { - const bId = serializeIdentifier(id); + const bId = serializeIdentifier(userId); let bUsername, bStatus; if (username) { @@ -50,7 +47,7 @@ export const UPDATE_USER = { ]); }, - deserialize: (r: CommandResponse) => { - return r.status === 0 && r.data.length === 0; - } + deserialize: deserializeVoidResponse }; + +export const updateUser = wrapCommand(UPDATE_USER);