Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Custom data converter #477

Merged
merged 34 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fd9978f
Revert "Revert "feat: Add Protobuf binary and JSON data converter and…
lorensr Jan 22, 2022
cc0d414
fix: Make node `assert` module available to webpack
lorensr Jan 22, 2022
158b3bb
test(worker): Add failing proto integration test
lorensr Dec 7, 2021
dcf79d4
feat: Use custom data converter in Workflow
lorensr Jan 24, 2022
629abe5
chore: Create workflow-common package, fixes #434
lorensr Jan 28, 2022
3e501b0
feat: Only include proto3-json-serializer when needed
lorensr Jan 29, 2022
fd30a39
feat: Use the new DataConverter API
lorensr Jan 24, 2022
e0152bb
chore: Remove unused data converter from otel interceptors
lorensr Feb 8, 2022
3430625
feat: Complete new DC API conversion
lorensr Feb 11, 2022
9720e70
docs: Update docs/data-converter.md; fix linting
lorensr Feb 11, 2022
9b474d1
Use fallback instead of empty module for __temporal_custom_payload_co…
lorensr Feb 14, 2022
717e096
docs: Add docstrings to exported functions
lorensr Feb 15, 2022
01090b7
chore: Use root .gitignore
lorensr Feb 15, 2022
bb62659
chore: Remove duplicate devDep
lorensr Feb 15, 2022
1d51cff
chore: Address review comments
lorensr Feb 15, 2022
b809ccb
fix: Don't mutate Failures
lorensr Feb 18, 2022
9bb131c
fix: Don't mutate Activation when decoding
lorensr Feb 18, 2022
c35ed2a
fix: Run errorToFailure inside vm
lorensr Feb 19, 2022
62c4885
chore: Don't export proto payload converters from workflow-common
lorensr Feb 23, 2022
6cb9540
fix: Update tryToContinueAfterCompletion
lorensr Feb 23, 2022
0bc2fda
chore: Move common packages
lorensr Feb 24, 2022
1b265a1
feat!: Add common package and update references
lorensr Feb 24, 2022
f37057c
fix: UndefinedPayloadConverter test
lorensr Feb 28, 2022
cd1a310
chore: Fix tests
lorensr Feb 28, 2022
663ffbb
chore: Fix integration test and add codec tests
lorensr Feb 28, 2022
a29118b
chore: Disable header encoding for now
lorensr Mar 1, 2022
4a931b7
chore: Fix rest of integration tests
lorensr Mar 1, 2022
29ac6e6
test: Run custom codec on integration tests
lorensr Mar 1, 2022
af61fff
chore: Merge main
lorensr Mar 1, 2022
a68bc36
Remove unnecessary useCustomPayloadConverter
lorensr Mar 1, 2022
c25e9ab
Guard against non-Record module
lorensr Mar 1, 2022
ffcef1c
Make PayloadConverter.toPayload return undefined instead of throwing
lorensr Mar 2, 2022
5b4ded8
Address review comments
lorensr Mar 2, 2022
243f934
Address comments
lorensr Mar 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions docs/data-converter.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function workflowInclusiveInstanceOf(instance: unknown, type: Function): boolean

## Decision

Given the possibility of switching or adding other isolation methods in future, we opted to convert to/from Payloads inside the vm (`PayloadConverter`). We also added another transformer layer called `PayloadCodec` that runs outside the vm, can use node modules and Promises, and operates on Payloads. A `DataConverter` is a `PayloadConverter` and a `PayloadCodec`:
Given the possibility of switching or adding other isolation methods in future, we opted to convert to/from Payloads inside the vm (`PayloadConverter`). We also added another transformer layer called `PayloadCodec` that runs outside the vm, can use node APIs and Promises, and operates on Payloads. A `DataConverter` is a `PayloadConverter` and a `PayloadCodec`:
lorensr marked this conversation as resolved.
Show resolved Hide resolved

```ts
export interface DataConverter {
Expand All @@ -32,7 +32,7 @@ export interface DataConverter {
}

export interface PayloadConverter {
toPayload<T>(value: T): Payload;
toPayload<T>(value: T): Payload | undefined;
fromPayload<T>(payload: Payload): T;
}

Expand All @@ -46,29 +46,14 @@ export interface PayloadCodec {

`PayloadCodec` only runs in the main thread.

When `WorkerOptions.dataConverter.payloadConverterPath` is provided, the code at that location is loaded into the main thread, the worker threads, and the webpack Workflow bundle.
When `WorkerOptions.dataConverter.payloadConverterPath` is provided, the code at that location is loaded into the main thread and the webpack Workflow bundle.

`Worker.create`:
_main thread_

- imports and validates `options.dataConverter.payloadConverterPath`
- passes `payloadConverterPath` to `WorkflowCodeBundler`

Execution goes to either:

- `ThreadedVMWorkflowCreator.create`
_main thread_

- `VMWorkflowCreator.create`
_worker thread (unless in debug mode)_

And then to:

- `VMWorkflowCreator.createWorkflow`
_worker thread (unless in debug mode)_

And then to:

`worker-interface.ts#initRuntime`:
lorensr marked this conversation as resolved.
Show resolved Hide resolved
_workflow vm_

Expand Down
47 changes: 29 additions & 18 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ export interface WorkflowClientOptions {
}

export type WorkflowClientOptionsWithDefaults = Required<WorkflowClientOptions>;
export type LoadedWorkflowClientOptions = WorkflowClientOptionsWithDefaults & {
loadedDataConverter: LoadedDataConverter;
};

export function defaultWorkflowClientOptions(): WorkflowClientOptionsWithDefaults {
return {
Expand Down Expand Up @@ -243,12 +246,14 @@ export type WorkflowStartOptions<T extends Workflow = Workflow> = WithWorkflowAr
* Client for starting Workflow executions and creating Workflow handles
*/
export class WorkflowClient {
public readonly options: WorkflowClientOptionsWithDefaults;
protected readonly dataConverter: LoadedDataConverter;
public readonly options: LoadedWorkflowClientOptions;

constructor(public readonly service: WorkflowService = new Connection().service, options?: WorkflowClientOptions) {
this.dataConverter = loadDataConverter(options?.dataConverter);
this.options = { ...defaultWorkflowClientOptions(), ...options };
this.options = {
...defaultWorkflowClientOptions(),
...options,
loadedDataConverter: loadDataConverter(options?.dataConverter),
};
}

/**
Expand Down Expand Up @@ -429,7 +434,7 @@ export class WorkflowClient {
// Note that we can only return one value from our workflow function in JS.
// Ignore any other payloads in result
const [result] = await decodeArrayFromPayloads(
this.dataConverter,
this.options.loadedDataConverter,
ev.workflowExecutionCompletedEventAttributes.result?.payloads
);
return result as any;
Expand All @@ -442,14 +447,14 @@ export class WorkflowClient {
const { failure, retryState } = ev.workflowExecutionFailedEventAttributes;
throw new WorkflowFailedError(
'Workflow execution failed',
await decodeOptionalFailureToOptionalError(this.dataConverter, failure),
await decodeOptionalFailureToOptionalError(this.options.loadedDataConverter, failure),
retryState ?? RetryState.RETRY_STATE_UNSPECIFIED
);
} else if (ev.workflowExecutionCanceledEventAttributes) {
const failure = new CancelledFailure(
'Workflow canceled',
await decodeArrayFromPayloads(
this.dataConverter,
this.options.loadedDataConverter,
ev.workflowExecutionCanceledEventAttributes.details?.payloads
)
);
Expand Down Expand Up @@ -529,7 +534,7 @@ export class WorkflowClient {
execution: input.workflowExecution,
query: {
queryType: input.queryType,
queryArgs: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
queryArgs: { payloads: await encodeToPayloads(this.options.loadedDataConverter, ...input.args) },
},
});
} catch (err) {
Expand All @@ -545,7 +550,7 @@ export class WorkflowClient {
throw new TypeError('Invalid response from server');
}
// We ignore anything but the first result
return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads);
return await decodeFromPayloadsAtIndex(this.options.loadedDataConverter, 0, response.queryResult?.payloads);
}

/**
Expand All @@ -562,7 +567,7 @@ export class WorkflowClient {
requestId: uuid4(),
// control is unused,
signalName: input.signalName,
input: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
input: { payloads: await encodeToPayloads(this.options.loadedDataConverter, ...input.args) },
});
} catch (err) {
this.rethrowGrpcError(err, input.workflowExecution, 'Failed to signal Workflow');
Expand All @@ -585,9 +590,9 @@ export class WorkflowClient {
workflowId: options.workflowId,
workflowIdReusePolicy: options.workflowIdReusePolicy,
workflowType: { name: workflowType },
input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) },
input: { payloads: await encodeToPayloads(this.options.loadedDataConverter, ...options.args) },
signalName,
signalInput: { payloads: await encodeToPayloads(this.dataConverter, ...signalArgs) },
signalInput: { payloads: await encodeToPayloads(this.options.loadedDataConverter, ...signalArgs) },
taskQueue: {
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED,
name: options.taskQueue,
Expand All @@ -596,10 +601,12 @@ export class WorkflowClient {
workflowRunTimeout: options.workflowRunTimeout,
workflowTaskTimeout: options.workflowTaskTimeout,
retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined,
memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined,
memo: options.memo
? { fields: await encodeMapToPayloads(this.options.loadedDataConverter, options.memo) }
: undefined,
searchAttributes: options.searchAttributes
? {
indexedFields: await encodeMapToPayloads(this.dataConverter, options.searchAttributes),
indexedFields: await encodeMapToPayloads(this.options.loadedDataConverter, options.searchAttributes),
}
: undefined,
cronSchedule: options.cronSchedule,
Expand All @@ -626,7 +633,7 @@ export class WorkflowClient {
workflowId: opts.workflowId,
workflowIdReusePolicy: opts.workflowIdReusePolicy,
workflowType: { name: workflowType },
input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) },
input: { payloads: await encodeToPayloads(this.options.loadedDataConverter, ...opts.args) },
taskQueue: {
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_UNSPECIFIED,
name: opts.taskQueue,
Expand All @@ -635,10 +642,10 @@ export class WorkflowClient {
workflowRunTimeout: opts.workflowRunTimeout,
workflowTaskTimeout: opts.workflowTaskTimeout,
retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined,
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined,
memo: opts.memo ? { fields: await encodeMapToPayloads(this.options.loadedDataConverter, opts.memo) } : undefined,
searchAttributes: opts.searchAttributes
? {
indexedFields: await encodeMapToPayloads(this.dataConverter, opts.searchAttributes),
indexedFields: await encodeMapToPayloads(this.options.loadedDataConverter, opts.searchAttributes),
}
: undefined,
cronSchedule: opts.cronSchedule,
Expand Down Expand Up @@ -672,7 +679,11 @@ export class WorkflowClient {
namespace: this.options.namespace,
identity: this.options.identity,
...input,
details: { payloads: input.details ? await encodeToPayloads(this.dataConverter, ...input.details) : undefined },
details: {
payloads: input.details
? await encodeToPayloads(this.options.loadedDataConverter, ...input.details)
: undefined,
},
firstExecutionRunId: input.firstExecutionRunId,
});
} catch (err) {
Expand Down
23 changes: 15 additions & 8 deletions packages/common/src/converter/data-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ import { PayloadCodec } from './payload-codec';
import { PayloadConverter } from './payload-converter';

/**
* When your data (arguments and return values) is sent over the wire and stored by Temporal Server,
* it is encoded in binary in a {@link Payload} Protobuf message.
* When your data (arguments and return values) is sent over the wire and stored by Temporal Server, it is encoded in
* binary in a {@link Payload} Protobuf message.
*
* The default `DataConverter` supports `Uint8Array`, and JSON serializables (so if [`JSON.stringify(yourArgOrRetval)`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify#description) works, the default data converter will work).
* The default `DataConverter` supports `Uint8Array`, and JSON serializables (so if
* [`JSON.stringify(yourArgOrRetval)`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify#description)
* works, the default data converter will work). Protobufs are supported via [this
* API](https://docs.temporal.io/docs/typescript/data-converters#protobufs).
*
* Use a custom `DataConverter` to control the contents of your {@link Payload}s.
* Common reasons for using a custom `DataConverter` are:
* - Converting values that are not supported by the default `DataConverter` (for example, `JSON.stringify()` doesn't handle `BigInt`s, so if you want to return `{ total: 1000n }` from a Workflow, Signal, or Activity, you need your own `DataConverter`).
* - Encrypting values that may contain private information that you don't want stored in plaintext in Temporal Server's database.
* Use a custom `DataConverter` to control the contents of your {@link Payload}s. Common reasons for using a custom
* `DataConverter` are:
* - Converting values that are not supported by the default `DataConverter` (for example, `JSON.stringify()` doesn't
* handle `BigInt`s, so if you want to return `{ total: 1000n }` from a Workflow, Signal, or Activity, you need your
* own `DataConverter`).
* - Encrypting values that may contain private information that you don't want stored in plaintext in Temporal Server's
* database.
* - Compressing values to reduce disk or network usage.
*
* To use your custom `DataConverter`, provide it to the {@link WorkflowClient}, {@link Worker}, and {@link bundleWorkflowCode} (if you use it):
* To use your custom `DataConverter`, provide it to the {@link WorkflowClient}, {@link Worker}, and
* {@link bundleWorkflowCode} (if you use it):
* - `new WorkflowClient({ ..., dataConverter })`
* - `Worker.create({ ..., dataConverter })`
* - `bundleWorkflowCode({ ..., payloadConverterPath })`
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/converter/payload-codec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Payload } from './types';
/**
* `PayloadCodec` is an optional step that happens between the wire and the {@link PayloadConverter}:
*
* Temporal Server ↔️ Wire ↔️ `PayloadCodec` ↔️ `PayloadConverter` ↔️ User code
* Temporal Server <--> Wire <--> `PayloadCodec` <--> `PayloadConverter` <--> User code
*
* Implement this to transform an array of {@link Payload}s to/from the format sent over the wire and stored by Temporal Server.
* Common transformations are encryption and compression.
Expand Down
62 changes: 39 additions & 23 deletions packages/common/src/converter/payload-converter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UnsupportedTypeError, ValueError } from '@temporalio/internal-workflow-common';
import { PayloadConverterError, ValueError } from '@temporalio/internal-workflow-common';
import {
BinaryPayloadConverter,
JsonPayloadConverter,
Expand All @@ -8,8 +8,7 @@ import {
import { METADATA_ENCODING_KEY, Payload, str } from './types';

/**
* Used by the framework to serialize/deserialize parameters and return values that need to be
* sent over the wire.
* Used by the framework to serialize/deserialize parameters and return values.
*
* This is called inside the [Workflow isolate](https://docs.temporal.io/docs/typescript/determinism).
* To write async code or use Node APIs (or use packages that use Node APIs), use a {@link PayloadCodec}.
Expand All @@ -19,7 +18,7 @@ export interface PayloadConverter {
* Converts a value to a {@link Payload}.
* @param value The value to convert. Example values include the Workflow args sent by the client and the values returned by a Workflow or Activity.
*/
toPayload<T>(value: T): Payload;
toPayload<T>(value: T): Payload | undefined;

/**
* Converts a {@link Payload} back to a value.
Expand All @@ -40,23 +39,24 @@ export class CompositePayloadConverter implements PayloadConverter {

/**
* Tries to run `.toPayload(value)` on each converter in the order provided at construction.
* Returns the first successful result.
* @throws {@link ValueError} if no converter can convert the value
* Returns the first successful result, or `undefined` if there is no converter that can handle the value.
*/
public toPayload<T>(value: T): Payload {
public toPayload<T>(value: T): Payload | undefined {
for (const converter of this.converters) {
try {
const result = converter.toPayload(value);
return result;
if (result !== undefined) {
return result;
}
} catch (e: unknown) {
if (e instanceof UnsupportedTypeError) {
if (e instanceof PayloadConverterError) {
lorensr marked this conversation as resolved.
Show resolved Hide resolved
continue;
} else {
throw e;
}
}
}
throw new ValueError(`Cannot serialize ${value} of type ${typeof value}`);
return undefined;
}

/**
Expand All @@ -75,20 +75,45 @@ export class CompositePayloadConverter implements PayloadConverter {
}
}

/**
* Tries to convert `value` to a {@link Payload}. Throws if conversion fails.
*
* @throws {@link PayloadConverterError}
*/
export function toPayload(converter: PayloadConverter, value: unknown): Payload {
const payload = converter.toPayload(value);
if (payload === undefined) {
throw new PayloadConverterError(`Failed to convert value: ${value}`);
lorensr marked this conversation as resolved.
Show resolved Hide resolved
}
return payload;
}

/**
* Implements conversion of a list of values.
*
* @param converter
* @param values JS values to convert to Payloads.
* @return converted value
* @param values JS values to convert to Payloads
* @return converted values
* @throws PayloadConverterError if conversion of the value passed as parameter failed for any
* reason.
*/
export function toPayloads(converter: PayloadConverter, ...values: unknown[]): Payload[] | undefined {
if (values.length === 0) {
return undefined;
}
return values.map((value) => converter.toPayload(value));

return values.map((value) => toPayload(converter, value));
}

/**
* Run {@link PayloadConverter.toPayload} on each value in the map.
*
* @throws {@link PayloadConverterError} if conversion of any value in the map fails
*/
export function mapToPayloads<K extends string>(converter: PayloadConverter, map: Record<K, any>): Record<K, Payload> {
return Object.fromEntries(
Object.entries(map).map(([k, v]): [K, Payload] => [k as K, toPayload(converter, v)])
) as Record<K, Payload>;
}

/**
Expand All @@ -99,7 +124,7 @@ export function toPayloads(converter: PayloadConverter, ...values: unknown[]): P
* @param index index of the value in the payloads
* @param payloads serialized value to convert to JS values.
* @return converted JS value
* @throws PayloadConverterError if conversion of the data passed as parameter failed for any
* @throws {@link PayloadConverterError} if conversion of the data passed as parameter failed for any
* reason.
*/
export function fromPayloadsAtIndex<T>(converter: PayloadConverter, index: number, payloads?: Payload[] | null): T {
Expand All @@ -120,15 +145,6 @@ export function arrayFromPayloads(converter: PayloadConverter, payloads?: Payloa
return payloads.map((payload: Payload) => converter.fromPayload(payload));
}

/**
* Run {@link PayloadConverter.toPayload} on each value in the map.
*/
export function mapToPayloads<K extends string>(converter: PayloadConverter, map: Record<K, any>): Record<K, Payload> {
return Object.fromEntries(
Object.entries(map).map(([k, v]): [K, Payload] => [k as K, converter.toPayload(v)])
) as Record<K, Payload>;
}

export class DefaultPayloadConverter extends CompositePayloadConverter {
// Match the order used in other SDKs, but exclude Protobuf converters so that the code, including
// `proto3-json-serializer`, doesn't take space in Workflow bundles that don't use Protobufs. To use Protobufs, use
Expand Down
Loading