Skip to content

Commit

Permalink
fix: get rid of enums, add type helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Feb 1, 2024
1 parent 656653d commit 2e691d6
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 58 deletions.
27 changes: 13 additions & 14 deletions src/tcp.send-message.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { deleteStream } from './wire/stream/delete-stream.command.js';
import { purgeStream } from './wire/stream/purge-stream.command.js';
import { getOffset } from './wire/offset/get-offset.command.js';
import { storeOffset } from './wire/offset/store-offset.command.js';
import { uint8ToBuf } from './wire/number.utils.js';
import { Message } from './wire/message/poll.utils.js';
import { CreateMessage } from './wire/message/message.utils.js';
import { HeaderKind } from './wire/message/header.type.js';
import { HeaderValue } from './wire/message/header.type.js';
import { ConsumerKind } from './wire/offset/offset.utils.js';
import { Partitioning } from './wire/message/partitioning.utils.js';
import { PollingStrategy } from './wire/message/poll.utils.js';


try {
Expand Down Expand Up @@ -55,9 +55,9 @@ try {
// const r_createTopic = await createTopic(s)(topic1);
// console.log('RESPONSE_createTopic', r_createTopic);

const h0 = { 'foo': { kind: HeaderKind.Uint8, value: 1 } };
const h1 = { 'x-header-string-1': { kind: HeaderKind.String, value: 'incredible' } };
const h2 = { 'x-header-bool': { kind: HeaderKind.Bool, value: false } };
const h0 = { 'foo': HeaderValue.Int32(42), 'bar': HeaderValue.Uint8(123) };
const h1 = { 'x-header-string-1': HeaderValue.String('incredible') };
const h2 = { 'x-header-bool': HeaderValue.Bool(false) };

const msg = {
streamId,
Expand All @@ -67,30 +67,29 @@ try {
{ id: v7(), payload: 'content' },
{ id: v7(), payload: 'yolo msg' },
{ id: v7(), payload: 'yolo msg 2' },
{ id: v7(), payload: 'this is fuu', headers: h1},
{ id: v7(), payload: 'this is bar', headers: h2},
{ id: v7(), payload: 'this is fuu', headers: h1 },
{ id: v7(), payload: 'this is bar', headers: h2 },
],
partition: {kind: 2, value: 1}
partition: Partitioning.PartitionId(1)
};

// SEND MESSAGES
const rSend = await sendMessages(s)(msg);
console.log('RESPONSE SEND_MESSAGE', rSend);

// POLL MESSAGE
const pollStrat = { kind: 5, value: 0n };
const pollReq = {
streamId,
topicId,
consumer: { kind: 1, id: 1 },
consumer: { kind: ConsumerKind.Single, id: 1 },
partitionId,
pollingStrategy: pollStrat,
pollingStrategy: PollingStrategy.Next,
count: 10,
autocommit: false
};

const rPoll = await pollMessages(s)(pollReq);
const {messages, ...resp} = rPoll;
const { messages, ...resp } = rPoll;
const m = messages.map(
m => ({ id: m.id, headers: m.headers, payload: m.payload.toString() })
);
Expand Down
100 changes: 94 additions & 6 deletions src/wire/message/header.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@ export const HeaderKind = {
} as const;

export type HeaderKind = typeof HeaderKind;

export type HeaderKindId = keyof HeaderKind;
export type HeaderKindValue = ValueOf<HeaderKind>;

export const HeaderKindIds = Object.keys(HeaderKind) as HeaderKindId[];
export const HeaderKindValues = Object.values(HeaderKind) as HeaderKindValue[];

export const ReverseHeaderKind = reverseRecord(HeaderKind);


export type HeaderValueRaw = {
kind: HeaderKind['Raw'],
value: Buffer
Expand Down Expand Up @@ -123,3 +117,97 @@ export type HeaderValueDouble = {
// HeaderValueDouble;

// export type Headers = Record<string, HeaderValue>;


const Raw = (value: Buffer): HeaderValueRaw => ({
kind: HeaderKind.Raw,
value
});

const String = (value: string): HeaderValueString => ({
kind: HeaderKind.String,
value
});

const Bool = (value: boolean): HeaderValueBool => ({
kind: HeaderKind.Bool,
value
});

const Int8 = (value: number): HeaderValueInt8 => ({
kind: HeaderKind.Int8,
value
});

const Int16 = (value: number): HeaderValueInt16 => ({
kind: HeaderKind.Int16,
value
});

const Int32 = (value: number): HeaderValueInt32 => ({
kind: HeaderKind.Int32,
value
});

const Int64 = (value: bigint): HeaderValueInt64 => ({
kind: HeaderKind.Int64,
value
});

const Int128 = (value: Buffer): HeaderValueInt128 => ({
kind: HeaderKind.Int128,
value
});

const Uint8 = (value: number): HeaderValueUint8 => ({
kind: HeaderKind.Uint8,
value
});

const Uint16 = (value: number): HeaderValueUint16 => ({
kind: HeaderKind.Uint16,
value
});

const Uint32 = (value: number): HeaderValueUint32 => ({
kind: HeaderKind.Uint32,
value
});

const Uint64 = (value: bigint): HeaderValueUint64 => ({
kind: HeaderKind.Uint64,
value
});

const Uint128 = (value: Buffer): HeaderValueUint128 => ({
kind: HeaderKind.Uint128,
value
});

const Float = (value: number): HeaderValueFloat => ({
kind: HeaderKind.Float,
value
});

const Double = (value: number): HeaderValueDouble => ({
kind: HeaderKind.Double,
value
});

export const HeaderValue = {
Raw,
String,
Bool,
Int8,
Int16,
Int32,
Int64,
Int128,
Uint8,
Uint16,
Uint32,
Uint64,
Uint128,
Float,
Double
};
3 changes: 1 addition & 2 deletions src/wire/message/header.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import {
type HeaderValueFloat,
type HeaderValueDouble,
HeaderKind,
HeaderKindValues,
HeaderKindId,
HeaderKindValue,
ReverseHeaderKind
Expand Down Expand Up @@ -137,7 +136,7 @@ type ParsedHeaderDeserialized = {
}

export const mapHeaderKind = (k: number): HeaderKindId => {
if (!(HeaderKindValues.includes(k as HeaderKindValue)))
if (!ReverseHeaderKind[k as HeaderKindValue])
throw new Error(`unknow header kind: ${k}`);
return ReverseHeaderKind[k as HeaderKindValue];
}
Expand Down
44 changes: 36 additions & 8 deletions src/wire/message/partitioning.utils.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@

import { uint32ToBuf, uint64ToBuf } from '../number.utils.js';
import { ValueOf } from '../../type.utils.js';


export enum PartitionKind {
Balanced = 1,
PartitionId = 2,
MessageKey = 3
}
export const PartitionKind = {
Balanced : 1,
PartitionId : 2,
MessageKey : 3
} as const;


export type PartitionKind = typeof PartitionKind;
export type PartitionKindId = keyof PartitionKind;
export type PartitionKindValue = ValueOf<PartitionKind>

export type Balanced = {
kind: PartitionKind.Balanced,
kind: PartitionKind['Balanced'],
value: null
};

export type PartitionId = {
kind: PartitionKind.PartitionId,
kind: PartitionKind['PartitionId'],
value: number // uint32
};

// string | uint32/64/128
export type MessageKeyValue = string | number | bigint | Buffer;

export type MessageKey = {
kind: PartitionKind.MessageKey,
kind: PartitionKind['MessageKey'],
value: MessageKeyValue
};

export type Partitioning = Balanced | PartitionId | MessageKey;

const Balanced: Balanced = {
kind: PartitionKind.Balanced,
value: null
};

const PartitionId = (id: number): PartitionId => ({
kind: PartitionKind.PartitionId,
value: id
});

const MessageKey = (id: number): MessageKey => ({
kind: PartitionKind.MessageKey,
value: id
});

// Helper
export const Partitioning = {
Balanced,
PartitionId,
MessageKey
};

export const serializeMessageKey = (v: MessageKeyValue) => {
if (v instanceof Buffer) return v;
if ('string' === typeof v) return Buffer.from(v);
Expand Down
88 changes: 66 additions & 22 deletions src/wire/message/poll.utils.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@

import { type Id } from '../identifier.utils.js';
import { ValueOf, reverseRecord } from '../../type.utils.js';
import { deserializeUUID, toDate } from '../serialize.utils.js';
import { serializeGetOffset, type Consumer } from '../offset/offset.utils.js';
import { deserializeHeaders, type HeadersMap } from './header.utils.js';

export enum PollingStrategyKind {
Offset = 1,
Timestamp = 2,
First = 3,
Last = 4,
Next = 5
}
export const PollingStrategyKind = {
Offset: 1,
Timestamp: 2,
First: 3,
Last: 4,
Next: 5
} as const;

export type PollingStrategyKind = typeof PollingStrategyKind;
export type PollingStrategyKindId = keyof PollingStrategyKind;
export type PollingStrategyKindValue = ValueOf<PollingStrategyKind>

export type OffsetPollingStrategy = {
kind: PollingStrategyKind.Offset,
kind: PollingStrategyKind['Offset'],
value: bigint
}

export type TimestampPollingStrategy = {
kind: PollingStrategyKind.Timestamp,
kind: PollingStrategyKind['Timestamp'],
value: bigint
}

export type FirstPollingStrategy = {
kind: PollingStrategyKind.First,
kind: PollingStrategyKind['First'],
value: 0n
}

export type LastPollingStrategy = {
kind: PollingStrategyKind.Last,
kind: PollingStrategyKind['Last'],
value: 0n
}

export type NextPollingStrategy = {
kind: PollingStrategyKind.Next,
kind: PollingStrategyKind['Next'],
value: 0n
}

Expand All @@ -45,6 +50,41 @@ export type PollingStrategy =
NextPollingStrategy;


const Next: NextPollingStrategy = {
kind: PollingStrategyKind.Next,
value:0n
};

const First: FirstPollingStrategy = {
kind: PollingStrategyKind.First,
value:0n
};

const Last: LastPollingStrategy = {
kind: PollingStrategyKind.Last,
value:0n
};

const Offset = (n: bigint): OffsetPollingStrategy => ({
kind: PollingStrategyKind.Offset,
value: n
});

const Timestamp = (n: bigint): TimestampPollingStrategy => ({
kind: PollingStrategyKind.Timestamp,
value: n
});

// helper
export const PollingStrategy = {
Next,
First,
Last,
Offset,
Timestamp
};


export const serializePollMessages = (
streamId: Id,
topicId: Id,
Expand All @@ -66,18 +106,22 @@ export const serializePollMessages = (
]);
};

export enum MessageState {
Available = 1,
Unavailable = 10,
Poisoned = 20,
MarkedForDeletion = 30
};
export const MessageState = {
Available: 1,
Unavailable: 10,
Poisoned: 20,
MarkedForDeletion: 30
}

type MessageState = typeof MessageState;
type MessageStateId = keyof MessageState;
type MessageStateValue = ValueOf<MessageState>;
const ReverseMessageState = reverseRecord(MessageState);

export const mapMessageState = (s: number): string => {
if (!(s in MessageState))
throw new Error(`unknow MessageState: ${s}`);
return MessageState[s];
export const mapMessageState = (k: number): MessageStateId => {
if(!ReverseMessageState[k as MessageStateValue])
throw new Error(`unknow message state: ${k}`);
return ReverseMessageState[k as MessageStateValue];
}

export type Message = {
Expand Down
Loading

0 comments on commit 2e691d6

Please sign in to comment.