Skip to content

Commit

Permalink
feat: wraps command to higher level api, starts client
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jan 30, 2024
1 parent 9ed0794 commit 2ae16e2
Show file tree
Hide file tree
Showing 65 changed files with 1,006 additions and 700 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "nodejs iggy.rs binary client",
"main": "index.js",
"scripts": {
"test": "node --test dist/",
"test": "node --test --experimental-test-coverage dist/**/**/*.test.js",
"clean": "rm -Rf dist/",
"build": "tsc -p tsconfig.json",
"start": "node dist/index.js"
Expand Down
10 changes: 10 additions & 0 deletions src/client/client.type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

export type CommandResponse = {
status: number,
length: number,
data: Buffer
};

export type Client = {
sendCommand: (code: number, payload: Buffer) => Promise<CommandResponse>
}
52 changes: 52 additions & 0 deletions src/client/client.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

import type { CommandResponse } from './client.type.js';
import { translateCommandCode } from '../wire/command.code.js';


export const handleResponse = (r: Buffer) => {
const status = r.readUint32LE(0);
const length = r.readUint32LE(4);
console.log('<== handleResponse', { status, length });
return {
status, length, data: r.subarray(8)
}};

export const deserializeVoidResponse =
(r: CommandResponse): Boolean => r.status === 0 && r.data.length === 0;

const COMMAND_LENGTH = 4;

export const serializeCommand = (command: number, payload: Buffer) => {
const payloadSize = payload.length + COMMAND_LENGTH;
const head = Buffer.allocUnsafe(8);

head.writeUint32LE(payloadSize, 0);
head.writeUint32LE(command, 4);

console.log(
'==> CMD', command,
translateCommandCode(command),
head.subarray(4, 8).toString('hex'),
'LENGTH', payloadSize,
head.subarray(0, 4).toString('hex')
);

return Buffer.concat([head, payload]);
}


// enum TransportType {
// TCP = 'tcp',
// // TLS = 'tls',
// // QUIC = 'quic'
// }

// type ClientConfig = {
// transport: TransportType
// host: string,
// port: number
// }

// export const transportClient = (config: ClientConfig): Client => {
// const {transport} = config;
// };
52 changes: 22 additions & 30 deletions src/tcp.client.ts → src/client/tcp.client.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@

import { createConnection, Socket } from 'node:net';
import { responseError } from './wire/error.utils.js';
import { translateCommandCode } from './wire/command.code.js';
import { serializeCommand } from './client.utils.js';
import type { CommandResponse, Client } from './client.type.js';
import { responseError } from '../wire/error.utils.js';

// interface IggyClient {
// socket: Socket
// }

const COMMAND_LENGTH = 4;
// const REQUEST_INITIAL_BYTES_LENGTH = 4;

export type CommandResponse = {
status: number,
length: number,
data: Buffer
};

export const createClient = (
host: string, port: number, keepAlive = true
Expand All @@ -36,25 +25,10 @@ export const createClient = (
});
};


export const sendCommandWithResponse = (s: Socket) =>
(command: number, payload: Buffer): Promise<CommandResponse> => {

const payloadSize = payload.length + COMMAND_LENGTH;
const head = Buffer.alloc(8);

head.writeUint32LE(payloadSize, 0);
head.writeUint32LE(command, 4);

console.log(
'==> CMD', command,
translateCommandCode(command),
head.subarray(4, 8).toString('hex'),
'LENGTH', payloadSize,
head.subarray(0, 4).toString('hex')
);

const cmd = Buffer.concat([head, payload]);
const cmd = serializeCommand(command, payload);
console.log(
'==> sending cmd', command, cmd.toString('hex')
);
Expand Down Expand Up @@ -87,3 +61,21 @@ export const handleResponse = (r: Buffer) => {
status, length, data: r.subarray(8)
}
};

export type TcpOption = {
host: string,
port: number,
keepAlive?: boolean
};


export const TcpClient = ({ host, port, keepAlive = true }: TcpOption): Client => {
let socket: Socket;
return {
sendCommand: async (code: number, payload: Buffer):Promise<CommandResponse> => {
if (!socket)
socket = await createClient(host, port, keepAlive);
return sendCommandWithResponse(socket)(code, payload);
}
}
};
41 changes: 17 additions & 24 deletions src/tcp.client.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,34 @@

import { createClient, sendCommandWithResponse } from './tcp.client.js';

import { LOGIN } from './wire/session/login.command.js';
import { GET_ME } from './wire/client/get-me.command.js';
import { GET_CLIENTS } from './wire/client/get-clients.command.js';
import { GET_CLIENT } from './wire/client/get-client.command.js';
import { LOGOUT } from './wire/session/logout.command.js';
import { TcpClient } from './client/tcp.client.js';
import { login } from './wire/session/login.command.js';
import { getMe } from './wire/client/get-me.command.js';
import { getClients } from './wire/client/get-clients.command.js';
import { getClient } from './wire/client/get-client.command.js';
import { logout } from './wire/session/logout.command.js';

try {
// create socket
const s = await createClient('127.0.0.1', 8090);
console.log('CLI', s.readyState);
const s = TcpClient({ host: '127.0.0.1', port: 8090 });

// LOGIN
const loginCmd = LOGIN.serialize('iggy', 'iggy');
console.log('LOGIN', loginCmd.toString());
const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd);
console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r));
const r = await login(s)({ username: 'iggy', password: 'iggy' });
console.log('RESPONSE_login', r);

// GET_ME
const r_getMe = await sendCommandWithResponse(s)(GET_ME.code, GET_ME.serialize());
console.log('RESPONSE_getMe', r_getMe.toString(), GET_ME.deserialize(r_getMe));
const r_getMe = await getMe(s)();
console.log('RESPONSE_getMe', r_getMe);

// GET_CLIENTS
const r4 = await sendCommandWithResponse(s)(GET_CLIENTS.code, GET_CLIENTS.serialize());
const ls = GET_CLIENTS.deserialize(r4) // used after;
console.log('RESPONSE4', r4.toString(), ls);
const ls = await getClients(s)();
console.log('RESPONSE4', ls);

// GET_CLIENT #ID
const rCli = await sendCommandWithResponse(s)(
GET_CLIENT.code, GET_CLIENT.serialize(ls[0].clientId)
);
console.log('RESPONSECli', rCli.toString(), GET_CLIENTS.deserialize(rCli));
const rCli = await getClient(s)({ clientId: ls[0].clientId });
console.log('RESPONSECli', rCli);

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));
const rOut = await logout(s)();
console.log('RESPONSE LOGOUT', rOut);


} catch (err) {
Expand Down
151 changes: 75 additions & 76 deletions src/tcp.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,116 +1,115 @@

import { createClient, sendCommandWithResponse } from './tcp.client.js';

import { LOGIN } from './wire/session/login.command.js';
import { LOGOUT } from './wire/session/logout.command.js';
import { CREATE_STREAM } from './wire/stream/create-stream.command.js';
import { UPDATE_STREAM } from './wire/stream/update-stream.command.js';
import { GET_STREAM } from './wire/stream/get-stream.command.js';
import { GET_STREAMS } from './wire/stream/get-streams.command.js';
import { DELETE_STREAM } from './wire/stream/delete-stream.command.js';
import { CREATE_TOPIC } from './wire/topic/create-topic.command.js';
import { UPDATE_TOPIC } from './wire/topic/update-topic.command.js';
import { GET_TOPIC } from './wire/topic/get-topic.command.js';
import { GET_TOPICS } from './wire/topic/get-topics.command.js';
import { DELETE_TOPIC } from './wire/topic/delete-topic.command.js';
import { CREATE_PARTITION } from './wire/partition/create-partition.command.js';
import { DELETE_PARTITION } from './wire/partition/delete-partition.command.js';
// import { createClient, sendCommandWithResponse } from './tcp.client.js';

import { TcpClient } from './client/tcp.client.js';
import { login } from './wire/session/login.command.js';
import { logout } from './wire/session/logout.command.js';
import { createStream } from './wire/stream/create-stream.command.js';
import { updateStream } from './wire/stream/update-stream.command.js';
import { getStream } from './wire/stream/get-stream.command.js';
import { getStreams } from './wire/stream/get-streams.command.js';
import { deleteStream } from './wire/stream/delete-stream.command.js';
import { createTopic } from './wire/topic/create-topic.command.js';
import { updateTopic } from './wire/topic/update-topic.command.js';
import { getTopic } from './wire/topic/get-topic.command.js';
import { getTopics } from './wire/topic/get-topics.command.js';
import { deleteTopic } from './wire/topic/delete-topic.command.js';
import { createPartition } from './wire/partition/create-partition.command.js';
import { deletePartition } from './wire/partition/delete-partition.command.js';


try {
// create socket
const s = await createClient('127.0.0.1', 8090);
console.log('CLI', s.readyState);
const s = TcpClient({ host: '127.0.0.1', port: 8090 });

// LOGIN
const loginCmd = LOGIN.serialize('iggy', 'iggy');
console.log('LOGIN', loginCmd.toString());
const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd);
console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r));
const r = await login(s)({ username: 'iggy', password: 'iggy' });
console.log('RESPONSE_login', r);

const streamName = 'test-stream';
const streamId = 1;
const stream = {
name: 'test-stream',
streamId: 1
};

// CREATE_STREAM
const createStreamCmd = CREATE_STREAM.serialize(streamId, streamName);
const r_createStream = await sendCommandWithResponse(s)(
CREATE_STREAM.code, createStreamCmd
);
console.log('RESPONSE_createStream', CREATE_STREAM.deserialize(r_createStream));
const r_createStream = await createStream(s)(stream);
console.log('RESPONSE_createStream', r_createStream);

// GET_STREAM #ID
const getStreamCmd = GET_STREAM.serialize(streamId);
const r7 = await sendCommandWithResponse(s)(GET_STREAM.code, getStreamCmd);
console.log('RESPONSE7', GET_STREAM.deserialize(r7));
const r7 = await getStream(s)({ streamId: stream.streamId });
console.log('RESPONSE7', r7);

// GET_STREAM #NAME
const getStreamCmd2 = GET_STREAM.serialize(streamName);
const r8 = await sendCommandWithResponse(s)(GET_STREAM.code, getStreamCmd2);
console.log('RESPONSE8', GET_STREAM.deserialize(r8));
const r8 = await getStream(s)({ streamId: stream.name });
console.log('RESPONSE8', r8);

// UPDATE_STREAM
const updateStreamCmd = UPDATE_STREAM.serialize(streamId, 'updatedStreamName');
const r_updateStream = await sendCommandWithResponse(s)(
UPDATE_STREAM.code, updateStreamCmd
);
console.log('RESPONSE_updateStream', UPDATE_STREAM.deserialize(r_updateStream));
const r_updateStream = await updateStream(s)({
streamId: stream.streamId, name: 'updatedStreamName'
});
console.log('RESPONSE_updateStream', r_updateStream);

// GET_STREAMS
const r9 = await sendCommandWithResponse(s)(GET_STREAMS.code, GET_STREAMS.serialize());
console.log('RESPONSE9', GET_STREAMS.deserialize(r9));
const r9 = await getStreams(s)();
console.log('RESPONSE9', r9);

const topic1 = {
streamId: stream.streamId,
topicId: 44,
name: 'topic-name-44',
partitionCount: 3,
messageExpiry: 0,
maxTopicSize: 0,
replicationFactor: 1
};

// CREATE_TOPIC
const ctp = CREATE_TOPIC.serialize(
streamId, 44, 'test-topic-44', 3, 0, 0, 1
);
const r_createTopic = await sendCommandWithResponse(s)(CREATE_TOPIC.code, ctp);
console.log('RESPONSE_createTopic', CREATE_TOPIC.deserialize(r_createTopic));
const r_createTopic = await createTopic(s)(topic1);
console.log('RESPONSE_createTopic', r_createTopic);

// GET_TOPIC
const gtp = GET_TOPIC.serialize(streamId, 'test-topic-44');
const r_getTopic = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
const t2 = GET_TOPIC.deserialize(r_getTopic);
const t2 = await getTopic(s)({ streamId: topic1.streamId, topicId: topic1.name });
console.log('RESPONSE_getTopic', t2);

// UPDATE_TOPIC
const utp = UPDATE_TOPIC.serialize(
streamId, 44, 'test-topic-44', 42
);
const r_updateTopic = await sendCommandWithResponse(s)(UPDATE_TOPIC.code, utp);
console.log('RESPONSE_updateTopic', UPDATE_TOPIC.deserialize(r_updateTopic));
const r_updateTopic = await updateTopic(s)({
streamId: topic1.streamId, topicId: topic1.topicId, name: topic1.name, messageExpiry: 42
});
console.log('RESPONSE_updateTopic', r_updateTopic);

// CREATE_PARTITION
const cpa = CREATE_PARTITION.serialize(streamId, t2.id, 22);
const r_createPartition = await sendCommandWithResponse(s)(CREATE_PARTITION.code, cpa);
console.log('RESPONSE_createPartition', CREATE_PARTITION.deserialize(r_createPartition));
const r_createPartition = await createPartition(s)({
streamId: topic1.streamId, topicId: t2.id, partitionCount: 22
});
console.log('RESPONSE_createPartition', r_createPartition);

// DELETE_PARTITION
const dpa = DELETE_PARTITION.serialize(streamId, t2.id, 19);
const r_deletePartition = await sendCommandWithResponse(s)(DELETE_PARTITION.code, dpa);
console.log('RESPONSE_deletePartition', DELETE_PARTITION.deserialize(r_deletePartition));
const r_deletePartition = await deletePartition(s)({
streamId: topic1.streamId, topicId: t2.id, partitionCount: 19
});
console.log('RESPONSE_deletePartition', r_deletePartition);

// GET_TOPIC AGAIN
const r_getTopic2 = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
console.log('RESPONSE_getTopic2', GET_TOPIC.deserialize(r_getTopic2));
// GET_TOPIC AGAIN
const r_getTopic2 = await getTopic(s)({ streamId: topic1.streamId, topicId: topic1.name });
console.log('RESPONSE_getTopic2', r_getTopic2);

// GET_TOPICS
const gtps = GET_TOPICS.serialize(streamId);
const r_getTopics = await sendCommandWithResponse(s)(GET_TOPICS.code, gtps);
console.log('RESPONSE_getTopics', GET_TOPICS.deserialize(r_getTopics));
// GET_TOPICS
const r_getTopics = await getTopics(s)({ streamId: topic1.streamId });
console.log('RESPONSE_getTopics', r_getTopics);

// DELETE TOPIC
const dtp = DELETE_TOPIC.serialize(streamId, t2.id, 3);
const r_deleteTopic = await sendCommandWithResponse(s)(DELETE_TOPIC.code, dtp);
console.log('RESPONSE_deleteTopic', DELETE_TOPIC.deserialize(r_deleteTopic));
const r_deleteTopic = await deleteTopic(s)({
streamId: topic1.streamId, topicId: t2.id, partitionsCount: 3
});
console.log('RESPONSE_deleteTopic', r_deleteTopic);

// DELETE STREAM
const dst = DELETE_STREAM.serialize(streamId);
const rDelS = await sendCommandWithResponse(s)(DELETE_STREAM.code, dst);
console.log('RESPONSEDelS', DELETE_STREAM.deserialize(rDelS));
const rDelS = await deleteStream(s)({ streamId: stream.streamId });
console.log('RESPONSEDelS', rDelS);

// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.deserialize(rOut));
const rOut = await logout(s)();
console.log('RESPONSE LOGOUT', rOut);


} catch (err) {
Expand Down
Loading

0 comments on commit 2ae16e2

Please sign in to comment.