From f6e2e260c5436fd5104342299078dc029208fee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Thu, 24 Oct 2024 16:22:18 -0700 Subject: [PATCH] Refactor transformers to support both directions --- lib/src/compiler/async.ts | 6 +- lib/src/compiler/sync.ts | 6 +- lib/src/compiler/utils.ts | 6 +- lib/src/message-transformer.test.ts | 8 +- lib/src/message-transformer.ts | 130 ++++++++++++++++++---------- lib/src/packet-transformer.test.ts | 14 +-- lib/src/packet-transformer.ts | 20 ++--- 7 files changed, 113 insertions(+), 77 deletions(-) diff --git a/lib/src/compiler/async.ts b/lib/src/compiler/async.ts index ad577db2..ca6cb612 100644 --- a/lib/src/compiler/async.ts +++ b/lib/src/compiler/async.ts @@ -20,7 +20,7 @@ import {compilerCommand} from '../compiler-path'; import {activeDeprecationOptions} from '../deprecations'; import {FunctionRegistry} from '../function-registry'; import {ImporterRegistry} from '../importer-registry'; -import {MessageTransformer} from '../message-transformer'; +import {HostMessageTransformer as MessageTransformer} from '../message-transformer'; import {PacketTransformer} from '../packet-transformer'; import * as utils from '../utils'; import * as proto from '../vendor/embedded_sass_pb'; @@ -156,8 +156,8 @@ export class AsyncCompiler { this.writeStdin(buffer); }); this.messageTransformer = new MessageTransformer( - packetTransformer.outboundProtobufs$, - packet => packetTransformer.writeInboundProtobuf(packet) + packetTransformer.protobufs$, + packet => packetTransformer.writeProtobuf(packet) ); } diff --git a/lib/src/compiler/sync.ts b/lib/src/compiler/sync.ts index 936ca903..c88f3cd8 100644 --- a/lib/src/compiler/sync.ts +++ b/lib/src/compiler/sync.ts @@ -18,7 +18,7 @@ import {activeDeprecationOptions} from '../deprecations'; import {Dispatcher} from '../dispatcher'; import {FunctionRegistry} from '../function-registry'; import {ImporterRegistry} from '../importer-registry'; -import {MessageTransformer} from '../message-transformer'; +import {HostMessageTransformer as MessageTransformer} from '../message-transformer'; import {PacketTransformer} from '../packet-transformer'; import {SyncProcess} from '../sync-process'; import * as utils from '../utils'; @@ -176,8 +176,8 @@ export class Compiler { this.writeStdin(buffer); }); this.messageTransformer = new MessageTransformer( - packetTransformer.outboundProtobufs$, - packet => packetTransformer.writeInboundProtobuf(packet) + packetTransformer.protobufs$, + packet => packetTransformer.writeProtobuf(packet) ); } diff --git a/lib/src/compiler/utils.ts b/lib/src/compiler/utils.ts index f04f03fc..7cdca153 100644 --- a/lib/src/compiler/utils.ts +++ b/lib/src/compiler/utils.ts @@ -17,7 +17,7 @@ import { removeLegacyImporterFromSpan, } from '../legacy/utils'; import {Logger} from '../logger'; -import {MessageTransformer} from '../message-transformer'; +import {HostMessageTransformer as MessageTransformer} from '../message-transformer'; import * as utils from '../utils'; import * as proto from '../vendor/embedded_sass_pb'; import {SourceSpan} from '../vendor/sass'; @@ -55,8 +55,8 @@ export function createDispatcher( ): Dispatcher { return new Dispatcher( compilationId, - messageTransformer.outboundMessages$, - message => messageTransformer.writeInboundMessage(message), + messageTransformer.messages$, + message => messageTransformer.writeMessage(message), handlers ); } diff --git a/lib/src/message-transformer.test.ts b/lib/src/message-transformer.test.ts index 7053139b..85e47ed0 100644 --- a/lib/src/message-transformer.test.ts +++ b/lib/src/message-transformer.test.ts @@ -7,7 +7,7 @@ import * as varint from 'varint'; import {create, toBinary} from '@bufbuild/protobuf'; import {expectObservableToError} from '../../test/utils'; -import {MessageTransformer} from './message-transformer'; +import {HostMessageTransformer as MessageTransformer} from './message-transformer'; import * as proto from './vendor/embedded_sass_pb'; describe('message transformer', () => { @@ -41,7 +41,7 @@ describe('message transformer', () => { it('encodes an InboundMessage to buffer', () => { const message = validInboundMessage('a {b: c}'); - messages.writeInboundMessage([1234, message]); + messages.writeMessage([1234, message]); expect(encodedProtobufs).toEqual([ Uint8Array.from([ ...varint.encode(1234), @@ -62,7 +62,7 @@ describe('message transformer', () => { }); it('decodes buffer to OutboundMessage', done => { - messages.outboundMessages$.subscribe({ + messages.messages$.subscribe({ next: message => decodedMessages.push(message), complete: () => { expect(decodedMessages.length).toBe(1); @@ -97,7 +97,7 @@ describe('message transformer', () => { describe('protocol error', () => { it('fails on invalid buffer', done => { expectObservableToError( - messages.outboundMessages$, + messages.messages$, 'Compiler caused error: Invalid compilation ID varint: RangeError: ' + 'Could not decode varint.', done diff --git a/lib/src/message-transformer.ts b/lib/src/message-transformer.ts index dd508396..e1950d9f 100644 --- a/lib/src/message-transformer.ts +++ b/lib/src/message-transformer.ts @@ -5,84 +5,120 @@ import {Observable, Subject} from 'rxjs'; import {map} from 'rxjs/operators'; import {fromBinary, toBinary} from '@bufbuild/protobuf'; +import type {Message} from '@bufbuild/protobuf'; +import type {GenMessage} from '@bufbuild/protobuf/codegenv1'; import * as varint from 'varint'; import {compilerError} from './utils'; -import { - InboundMessage, - InboundMessageSchema, - OutboundMessage, - OutboundMessageSchema, -} from './vendor/embedded_sass_pb'; +import * as proto from './vendor/embedded_sass_pb'; /** - * Encodes InboundMessages into protocol buffers and decodes protocol buffers - * into OutboundMessages. + * Encodes OutType into protocol buffers and decodes protocol buffers + * into InType. */ -export class MessageTransformer { +class MessageTransformer { // The decoded messages are written to this Subject. It is publicly exposed // as a readonly Observable. - private readonly outboundMessagesInternal$ = new Subject< - [number, OutboundMessage] - >(); + private readonly messagesInternal$ = new Subject<[number, InType]>(); /** - * The OutboundMessages, decoded from protocol buffers. If this fails to + * The InType messages, decoded from protocol buffers. If this fails to * decode a message, it will emit an error. */ - readonly outboundMessages$ = this.outboundMessagesInternal$.pipe(); + readonly messages$ = this.messagesInternal$.pipe(); constructor( - private readonly outboundProtobufs$: Observable, - private readonly writeInboundProtobuf: (buffer: Uint8Array) => void + private readonly InTypeSchema: GenMessage, + private readonly OutTypeSchema: GenMessage, + private readonly protobufs$: Observable, + private readonly writeProtobuf: (buffer: Uint8Array) => void ) { - this.outboundProtobufs$ - .pipe(map(decode)) - .subscribe(this.outboundMessagesInternal$); + this.protobufs$ + .pipe(map(buffer => this.decode(buffer))) + .subscribe(this.messagesInternal$); } /** - * Converts the inbound `compilationId` and `message` to a protocol buffer. + * Converts the `compilationId` and OutType `message` to a protocol buffer. */ - writeInboundMessage([compilationId, message]: [ - number, - InboundMessage, - ]): void { + writeMessage([compilationId, message]: [number, OutType]): void { const compilationIdLength = varint.encodingLength(compilationId); - const encodedMessage = toBinary(InboundMessageSchema, message); + const encodedMessage = toBinary(this.OutTypeSchema, message); const buffer = new Uint8Array(compilationIdLength + encodedMessage.length); varint.encode(compilationId, buffer); buffer.set(encodedMessage, compilationIdLength); try { - this.writeInboundProtobuf(buffer); + this.writeProtobuf(buffer); } catch (error) { - this.outboundMessagesInternal$.error(error); + this.messagesInternal$.error(error); + } + } + + // Decodes a protobuf `buffer` into a compilation ID and an InType message, + // ensuring that all mandatory message fields are populated. Throws if `buffer` + // cannot be decoded into a valid message, or if the message itself contains a + // Protocol Error. + private decode(buffer: Uint8Array): [number, InType] { + let compilationId: number; + try { + compilationId = varint.decode(buffer); + } catch (error) { + throw compilerError(`Invalid compilation ID varint: ${error}`); + } + + try { + return [ + compilationId, + fromBinary( + this.InTypeSchema, + new Uint8Array(buffer.buffer, varint.decode.bytes) + ), + ]; + } catch (error) { + throw compilerError(`Invalid protobuf: ${error}`); } } } -// Decodes a protobuf `buffer` into a compilation ID and an OutboundMessage, -// ensuring that all mandatory message fields are populated. Throws if `buffer` -// cannot be decoded into a valid message, or if the message itself contains a -// Protocol Error. -function decode(buffer: Uint8Array): [number, OutboundMessage] { - let compilationId: number; - try { - compilationId = varint.decode(buffer); - } catch (error) { - throw compilerError(`Invalid compilation ID varint: ${error}`); +/** + * Encodes InboundMessage into protocol buffers and decodes protocol buffers + * into OutboundMessage. + */ +export class HostMessageTransformer extends MessageTransformer< + proto.OutboundMessage, + proto.InboundMessage +> { + constructor( + protobufs$: Observable, + writeProtobuf: (buffer: Uint8Array) => void + ) { + super( + proto.OutboundMessageSchema, + proto.InboundMessageSchema, + protobufs$, + writeProtobuf + ); } +} - try { - return [ - compilationId, - fromBinary( - OutboundMessageSchema, - new Uint8Array(buffer.buffer, varint.decode.bytes) - ), - ]; - } catch (error) { - throw compilerError(`Invalid protobuf: ${error}`); +/** + * Encodes OutboundMessage into protocol buffers and decodes protocol buffers + * into InboundMessage. + */ +export class CompilerMessageTransformer extends MessageTransformer< + proto.InboundMessage, + proto.OutboundMessage +> { + constructor( + protobufs$: Observable, + writeProtobuf: (buffer: Uint8Array) => void + ) { + super( + proto.InboundMessageSchema, + proto.OutboundMessageSchema, + protobufs$, + writeProtobuf + ); } } diff --git a/lib/src/packet-transformer.test.ts b/lib/src/packet-transformer.test.ts index 78297883..165045cd 100644 --- a/lib/src/packet-transformer.test.ts +++ b/lib/src/packet-transformer.test.ts @@ -20,19 +20,19 @@ describe('packet transformer', () => { }); it('encodes an empty message', () => { - packets.writeInboundProtobuf(Buffer.from([])); + packets.writeProtobuf(Buffer.from([])); expect(encodedBuffers).toEqual([Buffer.from([0])]); }); it('encodes a message of length 1', () => { - packets.writeInboundProtobuf(Buffer.from([123])); + packets.writeProtobuf(Buffer.from([123])); expect(encodedBuffers).toEqual([Buffer.from([1, 123])]); }); it('encodes a message of length greater than 256', () => { - packets.writeInboundProtobuf(Buffer.alloc(300, 1)); + packets.writeProtobuf(Buffer.alloc(300, 1)); expect(encodedBuffers).toEqual([ Buffer.from([172, 2, ...new Array(300).fill(1)]), @@ -40,9 +40,9 @@ describe('packet transformer', () => { }); it('encodes multiple messages', () => { - packets.writeInboundProtobuf(Buffer.from([10])); - packets.writeInboundProtobuf(Buffer.from([20, 30])); - packets.writeInboundProtobuf(Buffer.from([40, 50, 60])); + packets.writeProtobuf(Buffer.from([10])); + packets.writeProtobuf(Buffer.from([20, 30])); + packets.writeProtobuf(Buffer.from([40, 50, 60])); expect(encodedBuffers).toEqual([ Buffer.from([1, 10]), @@ -57,7 +57,7 @@ describe('packet transformer', () => { function expectDecoding(expected: Buffer[], done: () => void): void { const actual: Buffer[] = []; - packets.outboundProtobufs$.subscribe({ + packets.protobufs$.subscribe({ next: protobuf => actual.push(protobuf), error: () => fail('expected correct decoding'), complete: () => { diff --git a/lib/src/packet-transformer.ts b/lib/src/packet-transformer.ts index 0322216b..b8205973 100644 --- a/lib/src/packet-transformer.ts +++ b/lib/src/packet-transformer.ts @@ -26,32 +26,32 @@ export class PacketTransformer { // The decoded protobufs are written to this Subject. It is publicly exposed // as a readonly Observable. - private readonly outboundProtobufsInternal$ = new Subject(); + private readonly protobufsInternal$ = new Subject(); /** * The fully-decoded, outbound protobufs. If any errors are encountered * during encoding/decoding, this Observable will error out. */ - readonly outboundProtobufs$ = this.outboundProtobufsInternal$.pipe(); + readonly protobufs$ = this.protobufsInternal$.pipe(); constructor( - private readonly outboundBuffers$: Observable, - private readonly writeInboundBuffer: (buffer: Buffer) => void + private readonly buffers$: Observable, + private readonly writeBuffer: (buffer: Buffer) => void ) { - this.outboundBuffers$ + this.buffers$ .pipe(mergeMap(buffer => this.decode(buffer))) - .subscribe(this.outboundProtobufsInternal$); + .subscribe(this.protobufsInternal$); } /** * Encodes a packet by pre-fixing `protobuf` with a header that describes its * length. */ - writeInboundProtobuf(protobuf: Uint8Array): void { + writeProtobuf(protobuf: Uint8Array): void { try { let length = protobuf.length; if (length === 0) { - this.writeInboundBuffer(Buffer.alloc(1)); + this.writeBuffer(Buffer.alloc(1)); return; } @@ -69,9 +69,9 @@ export class PacketTransformer { const packet = Buffer.alloc(header.length + protobuf.length); header.copy(packet); packet.set(protobuf, header.length); - this.writeInboundBuffer(packet); + this.writeBuffer(packet); } catch (error) { - this.outboundProtobufsInternal$.error(error); + this.protobufsInternal$.error(error); } }