Skip to content

Commit

Permalink
[Doc] pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Apr 18, 2024
1 parent b9a16aa commit f64d072
Showing 1 changed file with 38 additions and 62 deletions.
100 changes: 38 additions & 62 deletions messagepipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
# MessagePipeline

The MessagePipeline utility allows you to associate a number of transformations
on received messages. In HTTP requests are typically processed by a router
which dispatches a path within the URL to a handler function. Routers typically
use Middleware which allows pre-processing of the requests in order to perform
an initial validation or transformation of the request.

NATS subscriptions are effectively handlers for specific routes designated by
Subjects. These are similar in scope but because dispatching is typically
performed by the server (a message received by the client indicates the
subscription ID that is to receive the message) no general mechanism of
intercepting inbound messages is available.

A Message Pipeline provides an abstraction that allows the handler for a message
to offer similar handling convenience to a NATS application.

Note that in NATS unlike HTTP where a handler must be resolved while processing
the request in NATS it is possible to create transforms on a particular subject
and publish a transformed message that a different service picks up. The
response can be processed by a subsequent service (which may be N levels down).
This allows to integrate and intercept messages differently. This ability is
standard core NATS interaction patterns.

In this utility, the intention is to create a middleware chain that enables a
client handler to transform a message, and allow such transformations to be
easily reused. Transformations are bound to the original message arriving in a
subscription, and processed in-line.
The MessagePipeline utility allows you to compose a set of one or more
transformations that you can easily reuse across message handlers. If you are
thinking middleware for NATS, you are on the right track.

While NATS already provides a message-based vocabulary to implement
transformations, code that you may be on-boarding to NATS may rely on a series
of middleware transformations that you apply to input messages. If that is the
case, this utility will probably be very useful to you.

You can use a MessagePipeline to validate, reformat, and transform messages. For
example could check the schema of an input, and generate a different but
equivalent input, or annotate the message with additional information.

## Installing

Expand All @@ -40,11 +26,12 @@ deno add @synadiaorbit/messagepipeline

### Pipeline Functions

There are two variants for a pipeline
There are two variants for a pipeline. One is for `Sync` pipelines and the other
is for `Async`.

The base functionality for a pipeline is a function `PipelineSyncFn` or
`PipelineFn` that takes a `Msg` and returns a `Msg` or a `Promise<Msg>` in
return:
`PipelineFn` (async) that takes a `Msg` and returns a `Msg` or a `Promise<Msg>`
in return:

```typescript
export type SyncPipelineFn = (msg: Msg) => Msg;
Expand All @@ -57,48 +44,34 @@ Here's an example:
import { MutableMsg } from "./mod";

function reverse(m: Msg): Msg {
try {
const mm = MutableMsg.fromMsg(m);
mm.data = new TextEncoder().encode(m.string().split("").reverse().join(""));
return mm;
} catch (err) {
// typically the Pipeline function will not catch errors,
// as the pipeline itself will. In some cases if you follow
// a specific convention, you can introduce handling in it
// as follows, however your code may be more clear if
// the errors are handled by the message handler instead.
const h = headers();
h.set("Error", err.message);
m.respond(Empty, { headers: h });
// the throws will be caught by the pipeline, which can then
// choose to ignore the message
throw err;
}
const mm = MutableMsg.fromMsg(m);
mm.data = new TextEncoder().encode(m.string().split("").reverse().join(""));
return mm;
}
```

The above is a simple example that expects a source message to have a textual
payload, and returns a message with all it's text reversed. From the example,
the _convention_ used by this pipeline is that if there's an error, the handler
will respond an error message (which is simply a blank message with an "Error"
header set). Note that is by convention only.
The above example is simply a function that takes an input message, and then
creates a message that can be mutated from it. By using the source message,
message properties like subjects, reply subjects, headers and data are all
initialized to match the source message. Then additional transformations can be
applied, in the case above, the message text is just reversed.

### MutableMsg

Messages in the Javascript clients are immutable. For a pipeline, you'll need a
way of crafting a message, that is where `MutableMsg` comes in. Looks like a
standard message, but you are able to _set_ values.
standard message, but you are able to _set_ values on the available properties.

Note that if you use `MutableMsg.fromMsg()` with a message that originated from
a subscription, you'll effectively clone the message. If you use the
constructor, you are responsible to initialize all the fields, including a
special one called `publisher` - this is effectively a `NatsConnection`.
special one called `publisher` that enables `respond()` functionality - this is
effectively a reference to the `NatsConnection`.

### Pipelines

A MessagePipeline is a simply a collection of `PipelineFn` or `PipelineSyncFn`
executed in order. The `Pipelines` and `SyncPipelines` interfaces defines a
pipeline:
A Pipelines are simply a collection of `PipelineFn` or `PipelineSyncFn` executed
in order. The `Pipelines` and `SyncPipelines` interfaces defines a pipeline:

```typescript
export interface Pipelines {
Expand All @@ -110,10 +83,10 @@ export interface SyncPipelines {
}
```

The async version is fairly standard - if the pipeline fails, the `Promise`
rejects. For the sync version, the type is a `Result<Msg>`, you can test a
result to see if it is an error by checking its `isError` property. If `false`,
the `Result` will have a `value` of type `Msg`:
The async version is fairly standard - if the pipeline fails (one of its
functions throws), the `Promise` rejects. For the sync version, the type is a
`Result<Msg>`, you can test a result to see if it is an error by checking its
`isError` property. If `false`, the `Result` will have a `value` of type `Msg`:

```typescript
const r = pipeline.transform(m);
Expand All @@ -126,8 +99,11 @@ if (r.isError) {
}
```

The intention of result is have an efficient pattern for testing if the pipeline
succeed or failed without having unchecked errors.
The intention of `Result` is have an efficient pattern for testing if the pipeline
succeed or failed and preventing errors.

As you can see, using a Pipeline is very straight forward. It allows you to compose
repetitive code info a flow that could lead to a simpler handler.

### Full Example

Expand Down

0 comments on commit f64d072

Please sign in to comment.