Skip to content

Commit

Permalink
Refactor transformers to support both directions
Browse files Browse the repository at this point in the history
  • Loading branch information
ntkme committed Oct 25, 2024
1 parent 6d60eea commit f6e2e26
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 77 deletions.
6 changes: 3 additions & 3 deletions lib/src/compiler/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {compilerCommand} from '../compiler-path';
import {activeDeprecationOptions} from '../deprecations';
import {FunctionRegistry} from '../function-registry';
import {ImporterRegistry} from '../importer-registry';
import {MessageTransformer} from '../message-transformer';
import {HostMessageTransformer as MessageTransformer} from '../message-transformer';
import {PacketTransformer} from '../packet-transformer';
import * as utils from '../utils';
import * as proto from '../vendor/embedded_sass_pb';
Expand Down Expand Up @@ -156,8 +156,8 @@ export class AsyncCompiler {
this.writeStdin(buffer);
});
this.messageTransformer = new MessageTransformer(
packetTransformer.outboundProtobufs$,
packet => packetTransformer.writeInboundProtobuf(packet)
packetTransformer.protobufs$,
packet => packetTransformer.writeProtobuf(packet)
);
}

Expand Down
6 changes: 3 additions & 3 deletions lib/src/compiler/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {activeDeprecationOptions} from '../deprecations';
import {Dispatcher} from '../dispatcher';
import {FunctionRegistry} from '../function-registry';
import {ImporterRegistry} from '../importer-registry';
import {MessageTransformer} from '../message-transformer';
import {HostMessageTransformer as MessageTransformer} from '../message-transformer';
import {PacketTransformer} from '../packet-transformer';
import {SyncProcess} from '../sync-process';
import * as utils from '../utils';
Expand Down Expand Up @@ -176,8 +176,8 @@ export class Compiler {
this.writeStdin(buffer);
});
this.messageTransformer = new MessageTransformer(
packetTransformer.outboundProtobufs$,
packet => packetTransformer.writeInboundProtobuf(packet)
packetTransformer.protobufs$,
packet => packetTransformer.writeProtobuf(packet)
);
}

Expand Down
6 changes: 3 additions & 3 deletions lib/src/compiler/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
removeLegacyImporterFromSpan,
} from '../legacy/utils';
import {Logger} from '../logger';
import {MessageTransformer} from '../message-transformer';
import {HostMessageTransformer as MessageTransformer} from '../message-transformer';
import * as utils from '../utils';
import * as proto from '../vendor/embedded_sass_pb';
import {SourceSpan} from '../vendor/sass';
Expand Down Expand Up @@ -55,8 +55,8 @@ export function createDispatcher<sync extends 'sync' | 'async'>(
): Dispatcher<sync> {
return new Dispatcher<sync>(
compilationId,
messageTransformer.outboundMessages$,
message => messageTransformer.writeInboundMessage(message),
messageTransformer.messages$,
message => messageTransformer.writeMessage(message),
handlers
);
}
Expand Down
8 changes: 4 additions & 4 deletions lib/src/message-transformer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import * as varint from 'varint';
import {create, toBinary} from '@bufbuild/protobuf';

import {expectObservableToError} from '../../test/utils';
import {MessageTransformer} from './message-transformer';
import {HostMessageTransformer as MessageTransformer} from './message-transformer';
import * as proto from './vendor/embedded_sass_pb';

describe('message transformer', () => {
Expand Down Expand Up @@ -41,7 +41,7 @@ describe('message transformer', () => {

it('encodes an InboundMessage to buffer', () => {
const message = validInboundMessage('a {b: c}');
messages.writeInboundMessage([1234, message]);
messages.writeMessage([1234, message]);
expect(encodedProtobufs).toEqual([
Uint8Array.from([
...varint.encode(1234),
Expand All @@ -62,7 +62,7 @@ describe('message transformer', () => {
});

it('decodes buffer to OutboundMessage', done => {
messages.outboundMessages$.subscribe({
messages.messages$.subscribe({
next: message => decodedMessages.push(message),
complete: () => {
expect(decodedMessages.length).toBe(1);
Expand Down Expand Up @@ -97,7 +97,7 @@ describe('message transformer', () => {
describe('protocol error', () => {
it('fails on invalid buffer', done => {
expectObservableToError(
messages.outboundMessages$,
messages.messages$,
'Compiler caused error: Invalid compilation ID varint: RangeError: ' +
'Could not decode varint.',
done
Expand Down
130 changes: 83 additions & 47 deletions lib/src/message-transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,120 @@
import {Observable, Subject} from 'rxjs';
import {map} from 'rxjs/operators';
import {fromBinary, toBinary} from '@bufbuild/protobuf';
import type {Message} from '@bufbuild/protobuf';
import type {GenMessage} from '@bufbuild/protobuf/codegenv1';
import * as varint from 'varint';

import {compilerError} from './utils';
import {
InboundMessage,
InboundMessageSchema,
OutboundMessage,
OutboundMessageSchema,
} from './vendor/embedded_sass_pb';
import * as proto from './vendor/embedded_sass_pb';

/**
* Encodes InboundMessages into protocol buffers and decodes protocol buffers
* into OutboundMessages.
* Encodes OutType into protocol buffers and decodes protocol buffers
* into InType.
*/
export class MessageTransformer {
class MessageTransformer<InType extends Message, OutType extends Message> {
// The decoded messages are written to this Subject. It is publicly exposed
// as a readonly Observable.
private readonly outboundMessagesInternal$ = new Subject<
[number, OutboundMessage]
>();
private readonly messagesInternal$ = new Subject<[number, InType]>();

/**
* The OutboundMessages, decoded from protocol buffers. If this fails to
* The InType messages, decoded from protocol buffers. If this fails to
* decode a message, it will emit an error.
*/
readonly outboundMessages$ = this.outboundMessagesInternal$.pipe();
readonly messages$ = this.messagesInternal$.pipe();

constructor(
private readonly outboundProtobufs$: Observable<Uint8Array>,
private readonly writeInboundProtobuf: (buffer: Uint8Array) => void
private readonly InTypeSchema: GenMessage<InType>,
private readonly OutTypeSchema: GenMessage<OutType>,
private readonly protobufs$: Observable<Uint8Array>,
private readonly writeProtobuf: (buffer: Uint8Array) => void
) {
this.outboundProtobufs$
.pipe(map(decode))
.subscribe(this.outboundMessagesInternal$);
this.protobufs$
.pipe(map(buffer => this.decode(buffer)))
.subscribe(this.messagesInternal$);
}

/**
* Converts the inbound `compilationId` and `message` to a protocol buffer.
* Converts the `compilationId` and OutType `message` to a protocol buffer.
*/
writeInboundMessage([compilationId, message]: [
number,
InboundMessage,
]): void {
writeMessage([compilationId, message]: [number, OutType]): void {
const compilationIdLength = varint.encodingLength(compilationId);
const encodedMessage = toBinary(InboundMessageSchema, message);
const encodedMessage = toBinary(this.OutTypeSchema, message);
const buffer = new Uint8Array(compilationIdLength + encodedMessage.length);
varint.encode(compilationId, buffer);
buffer.set(encodedMessage, compilationIdLength);

try {
this.writeInboundProtobuf(buffer);
this.writeProtobuf(buffer);
} catch (error) {
this.outboundMessagesInternal$.error(error);
this.messagesInternal$.error(error);
}
}

// Decodes a protobuf `buffer` into a compilation ID and an InType message,
// ensuring that all mandatory message fields are populated. Throws if `buffer`
// cannot be decoded into a valid message, or if the message itself contains a
// Protocol Error.
private decode(buffer: Uint8Array): [number, InType] {
let compilationId: number;
try {
compilationId = varint.decode(buffer);
} catch (error) {
throw compilerError(`Invalid compilation ID varint: ${error}`);
}

try {
return [
compilationId,
fromBinary(
this.InTypeSchema,
new Uint8Array(buffer.buffer, varint.decode.bytes)
),
];
} catch (error) {
throw compilerError(`Invalid protobuf: ${error}`);
}
}
}

// Decodes a protobuf `buffer` into a compilation ID and an OutboundMessage,
// ensuring that all mandatory message fields are populated. Throws if `buffer`
// cannot be decoded into a valid message, or if the message itself contains a
// Protocol Error.
function decode(buffer: Uint8Array): [number, OutboundMessage] {
let compilationId: number;
try {
compilationId = varint.decode(buffer);
} catch (error) {
throw compilerError(`Invalid compilation ID varint: ${error}`);
/**
* Encodes InboundMessage into protocol buffers and decodes protocol buffers
* into OutboundMessage.
*/
export class HostMessageTransformer extends MessageTransformer<
proto.OutboundMessage,
proto.InboundMessage
> {
constructor(
protobufs$: Observable<Uint8Array>,
writeProtobuf: (buffer: Uint8Array) => void
) {
super(
proto.OutboundMessageSchema,
proto.InboundMessageSchema,
protobufs$,
writeProtobuf
);
}
}

try {
return [
compilationId,
fromBinary(
OutboundMessageSchema,
new Uint8Array(buffer.buffer, varint.decode.bytes)
),
];
} catch (error) {
throw compilerError(`Invalid protobuf: ${error}`);
/**
* Encodes OutboundMessage into protocol buffers and decodes protocol buffers
* into InboundMessage.
*/
export class CompilerMessageTransformer extends MessageTransformer<
proto.InboundMessage,
proto.OutboundMessage
> {
constructor(
protobufs$: Observable<Uint8Array>,
writeProtobuf: (buffer: Uint8Array) => void
) {
super(
proto.InboundMessageSchema,
proto.OutboundMessageSchema,
protobufs$,
writeProtobuf
);
}
}
14 changes: 7 additions & 7 deletions lib/src/packet-transformer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@ describe('packet transformer', () => {
});

it('encodes an empty message', () => {
packets.writeInboundProtobuf(Buffer.from([]));
packets.writeProtobuf(Buffer.from([]));

expect(encodedBuffers).toEqual([Buffer.from([0])]);
});

it('encodes a message of length 1', () => {
packets.writeInboundProtobuf(Buffer.from([123]));
packets.writeProtobuf(Buffer.from([123]));

expect(encodedBuffers).toEqual([Buffer.from([1, 123])]);
});

it('encodes a message of length greater than 256', () => {
packets.writeInboundProtobuf(Buffer.alloc(300, 1));
packets.writeProtobuf(Buffer.alloc(300, 1));

expect(encodedBuffers).toEqual([
Buffer.from([172, 2, ...new Array(300).fill(1)]),
]);
});

it('encodes multiple messages', () => {
packets.writeInboundProtobuf(Buffer.from([10]));
packets.writeInboundProtobuf(Buffer.from([20, 30]));
packets.writeInboundProtobuf(Buffer.from([40, 50, 60]));
packets.writeProtobuf(Buffer.from([10]));
packets.writeProtobuf(Buffer.from([20, 30]));
packets.writeProtobuf(Buffer.from([40, 50, 60]));

expect(encodedBuffers).toEqual([
Buffer.from([1, 10]),
Expand All @@ -57,7 +57,7 @@ describe('packet transformer', () => {

function expectDecoding(expected: Buffer[], done: () => void): void {
const actual: Buffer[] = [];
packets.outboundProtobufs$.subscribe({
packets.protobufs$.subscribe({
next: protobuf => actual.push(protobuf),
error: () => fail('expected correct decoding'),
complete: () => {
Expand Down
20 changes: 10 additions & 10 deletions lib/src/packet-transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,32 @@ export class PacketTransformer {

// The decoded protobufs are written to this Subject. It is publicly exposed
// as a readonly Observable.
private readonly outboundProtobufsInternal$ = new Subject<Buffer>();
private readonly protobufsInternal$ = new Subject<Buffer>();

/**
* The fully-decoded, outbound protobufs. If any errors are encountered
* during encoding/decoding, this Observable will error out.
*/
readonly outboundProtobufs$ = this.outboundProtobufsInternal$.pipe();
readonly protobufs$ = this.protobufsInternal$.pipe();

constructor(
private readonly outboundBuffers$: Observable<Uint8Array>,
private readonly writeInboundBuffer: (buffer: Buffer) => void
private readonly buffers$: Observable<Uint8Array>,
private readonly writeBuffer: (buffer: Buffer) => void
) {
this.outboundBuffers$
this.buffers$
.pipe(mergeMap(buffer => this.decode(buffer)))
.subscribe(this.outboundProtobufsInternal$);
.subscribe(this.protobufsInternal$);
}

/**
* Encodes a packet by pre-fixing `protobuf` with a header that describes its
* length.
*/
writeInboundProtobuf(protobuf: Uint8Array): void {
writeProtobuf(protobuf: Uint8Array): void {
try {
let length = protobuf.length;
if (length === 0) {
this.writeInboundBuffer(Buffer.alloc(1));
this.writeBuffer(Buffer.alloc(1));
return;
}

Expand All @@ -69,9 +69,9 @@ export class PacketTransformer {
const packet = Buffer.alloc(header.length + protobuf.length);
header.copy(packet);
packet.set(protobuf, header.length);
this.writeInboundBuffer(packet);
this.writeBuffer(packet);
} catch (error) {
this.outboundProtobufsInternal$.error(error);
this.protobufsInternal$.error(error);
}
}

Expand Down

0 comments on commit f6e2e26

Please sign in to comment.