Skip to content

Commit

Permalink
Merge pull request #91 from mlegenhausen/pipeable
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisFrezzato authored Sep 6, 2023
2 parents 8894629 + 3ccebea commit e294ef1
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 34 deletions.
13 changes: 8 additions & 5 deletions docs/modules/Middleware.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ Derivable from `Chain`.
**Signature**

```ts
export declare const chainFirst: <A, R, E, B>(
f: (a: A) => Middleware<R, R, E, B>
export declare const chainFirst: <A, R, E, _>(
f: (a: A) => Middleware<R, R, E, _>
) => (first: Middleware<R, R, E, A>) => Middleware<R, R, E, A>
```

Expand Down Expand Up @@ -1156,7 +1156,10 @@ Returns a middleware that pipes a stream to the response object.
**Signature**

```ts
export declare function pipeStream<E>(stream: NodeJS.ReadableStream): Middleware<BodyOpen, ResponseEnded, E, void>
export declare function pipeStream<E>(
stream: NodeJS.ReadableStream,
onError: (err: unknown) => IO<void>
): Middleware<BodyOpen, ResponseEnded, E, void>
```

Added in v0.7.0
Expand Down Expand Up @@ -1451,7 +1454,7 @@ Added in v0.7.0
**Signature**

```ts
export declare const fromIO: <R, E, A>(fa: IO<A>) => Middleware<R, R, E, A>
export declare const fromIO: <A, R, E>(fa: IO<A>) => Middleware<R, R, E, A>
```

Added in v0.7.0
Expand Down Expand Up @@ -1481,7 +1484,7 @@ Added in v0.7.0
**Signature**

```ts
export declare const fromTask: <R, E, A>(fa: T.Task<A>) => Middleware<R, R, E, A>
export declare const fromTask: <A, R, E>(fa: T.Task<A>) => Middleware<R, R, E, A>
```

Added in v0.7.0
Expand Down
7 changes: 4 additions & 3 deletions docs/modules/ReaderMiddleware.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,8 @@ Returns a `ReaderMiddleware` that pipes a stream to the response object.

```ts
export declare function pipeStream<R, E>(
stream: NodeJS.ReadableStream
stream: NodeJS.ReadableStream,
onError: (reason: unknown) => ReaderIO<R, void>
): ReaderMiddleware<R, H.BodyOpen, H.ResponseEnded, E, void>
```

Expand Down Expand Up @@ -1806,7 +1807,7 @@ Added in v0.7.0
**Signature**

```ts
export declare const fromIO: <S, R, E, A>(fa: IO<A>) => ReaderMiddleware<S, R, R, E, A>
export declare const fromIO: <A, S, R, E>(fa: IO<A>) => ReaderMiddleware<S, R, R, E, A>
```

Added in v0.7.0
Expand Down Expand Up @@ -1862,7 +1863,7 @@ Added in v0.6.3
**Signature**

```ts
export declare const fromTask: <S, R, E, A>(fa: Task<A>) => ReaderMiddleware<S, R, R, E, A>
export declare const fromTask: <A, S, R, E>(fa: Task<A>) => ReaderMiddleware<S, R, R, E, A>
```

Added in v0.7.0
Expand Down
2 changes: 1 addition & 1 deletion docs/modules/express.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ Added in v0.5.0
**Signature**

```ts
pipeStream(stream: NodeJS.ReadableStream): ExpressConnection<ResponseEnded>
pipeStream(stream: NodeJS.ReadableStream, onError: (e: unknown) => IO.IO<void>): ExpressConnection<ResponseEnded>
```

Added in v0.6.2
Expand Down
6 changes: 5 additions & 1 deletion docs/modules/index.ts.md
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,11 @@ export interface Connection<S> {
readonly setHeader: (this: Connection<HeadersOpen>, name: string, value: string) => Connection<HeadersOpen>
readonly setStatus: (this: Connection<StatusOpen>, status: Status) => Connection<HeadersOpen>
readonly setBody: (this: Connection<BodyOpen>, body: string | Buffer) => Connection<ResponseEnded>
readonly pipeStream: (this: Connection<BodyOpen>, stream: NodeJS.ReadableStream) => Connection<ResponseEnded>
readonly pipeStream: (
this: Connection<BodyOpen>,
stream: NodeJS.ReadableStream,
onError: (e: unknown) => IO<void>
) => Connection<ResponseEnded>
readonly endResponse: (this: Connection<BodyOpen>) => Connection<ResponseEnded>
}
```
Expand Down
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"docs-ts": "^0.6.10",
"dtslint": "github:gcanti/dtslint",
"express": "^4.17.1",
"fp-ts": "^2.10.0",
"fp-ts": "^2.13.2",
"fp-ts-contrib": "^0.1.26",
"fp-ts-routing": "^0.5.4",
"glob": "^7.2.0",
Expand Down
11 changes: 8 additions & 3 deletions src/Middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Alt3 } from 'fp-ts/Alt'
import { apFirst as apFirst_, apSecond as apSecond_, Apply3, apS as apS_ } from 'fp-ts/Apply'
import { bind as bind_, Chain3, chainFirst as chainFirst_ } from 'fp-ts/Chain'
import { Bifunctor3 } from 'fp-ts/Bifunctor'
import { identity, Lazy, pipe, Predicate, Refinement } from 'fp-ts/function'
import { identity, Lazy, pipe } from 'fp-ts/function'
import { Functor3, bindTo as bindTo_ } from 'fp-ts/Functor'
import { Monad3 } from 'fp-ts/Monad'
import { BodyOpen, Connection, CookieOptions, HeadersOpen, MediaType, ResponseEnded, Status, StatusOpen } from '.'
Expand Down Expand Up @@ -40,6 +40,8 @@ import {
chainTaskK as chainTaskK_,
chainFirstTaskK as chainFirstTaskK_,
} from 'fp-ts/FromTask'
import { Refinement } from 'fp-ts/Refinement'
import { Predicate } from 'fp-ts/Predicate'

declare module 'fp-ts/HKT' {
interface URItoKind3<R, E, A> {
Expand Down Expand Up @@ -649,8 +651,11 @@ export function redirect<E = never>(uri: string | { href: string }): Middleware<
* @category constructors
* @since 0.7.0
*/
export function pipeStream<E>(stream: NodeJS.ReadableStream): Middleware<BodyOpen, ResponseEnded, E, void> {
return modifyConnection((c) => c.pipeStream(stream))
export function pipeStream<E>(
stream: NodeJS.ReadableStream,
onError: (err: unknown) => IO<void>
): Middleware<BodyOpen, ResponseEnded, E, void> {
return modifyConnection((c) => c.pipeStream(stream, onError))
}

const isUnknownRecord = (u: unknown): u is Record<string, unknown> => u !== null && typeof u === 'object'
Expand Down
13 changes: 10 additions & 3 deletions src/ReaderMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @since 0.6.3
*/
import { flow, identity, Lazy, pipe, Predicate, Refinement } from 'fp-ts/function'
import { flow, identity, Lazy, pipe } from 'fp-ts/function'
import { bind as bind_, chainFirst as chainFirst_, Chain4 } from 'fp-ts/Chain'
import { ReaderTask } from 'fp-ts/ReaderTask'
import { Task } from 'fp-ts/Task'
Expand Down Expand Up @@ -37,6 +37,9 @@ import {
chainTaskK as chainTaskK_,
chainFirstTaskK as chainFirstTaskK_,
} from 'fp-ts/FromTask'
import { ReaderIO } from 'fp-ts/ReaderIO'
import { Refinement } from 'fp-ts/Refinement'
import { Predicate } from 'fp-ts/Predicate'

/**
* @category instances
Expand Down Expand Up @@ -452,9 +455,13 @@ export function redirect<R, E = never>(
* @since 0.7.3
*/
export function pipeStream<R, E>(
stream: NodeJS.ReadableStream
stream: NodeJS.ReadableStream,
onError: (reason: unknown) => ReaderIO<R, void>
): ReaderMiddleware<R, H.BodyOpen, H.ResponseEnded, E, void> {
return modifyConnection((c) => c.pipeStream(stream))
return pipe(
ask<R, H.BodyOpen, E>(),
ichain((r) => modifyConnection((c) => c.pipeStream(stream, (err) => onError(err)(r))))
)
}

/**
Expand Down
13 changes: 9 additions & 4 deletions src/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { Middleware, execMiddleware } from './Middleware'
import * as E from 'fp-ts/Either'
import { pipe } from 'fp-ts/function'
import * as L from 'fp-ts-contrib/List'
import { pipeline } from 'stream'
import * as IO from 'fp-ts/IO'
import * as O from 'fp-ts/Option'

/**
* @internal
Expand All @@ -19,7 +22,7 @@ export type Action =
| { type: 'setHeader'; name: string; value: string }
| { type: 'clearCookie'; name: string; options: CookieOptions }
| { type: 'setCookie'; name: string; value: string; options: CookieOptions }
| { type: 'pipeStream'; stream: NodeJS.ReadableStream }
| { type: 'pipeStream'; stream: NodeJS.ReadableStream; onError: (e: unknown) => IO.IO<void> }

const endResponse: Action = { type: 'endResponse' }

Expand Down Expand Up @@ -119,8 +122,8 @@ export class ExpressConnection<S> implements Connection<S> {
/**
* @since 0.6.2
*/
pipeStream(stream: NodeJS.ReadableStream): ExpressConnection<ResponseEnded> {
return this.chain({ type: 'pipeStream', stream }, true)
pipeStream(stream: NodeJS.ReadableStream, onError: (e: unknown) => IO.IO<void>): ExpressConnection<ResponseEnded> {
return this.chain({ type: 'pipeStream', stream, onError }, true)
}
/**
* @since 0.5.0
Expand All @@ -147,7 +150,9 @@ function run(res: Response, action: Action): Response {
case 'setStatus':
return res.status(action.status)
case 'pipeStream':
return action.stream.pipe(res)
return pipeline(action.stream, res, (err) =>
pipe(err, O.fromNullable, O.traverse(IO.Applicative)(action.onError))()
)
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { MonadTask3 } from 'fp-ts/MonadTask'
import { MonadThrow3 } from 'fp-ts/MonadThrow'
import { IncomingMessage } from 'http'
import * as M from './Middleware'
import { IO } from 'fp-ts/IO'

/**
* Adapted from https://github.com/purescript-contrib/purescript-media-types
Expand Down Expand Up @@ -189,7 +190,11 @@ export interface Connection<S> {
readonly setHeader: (this: Connection<HeadersOpen>, name: string, value: string) => Connection<HeadersOpen>
readonly setStatus: (this: Connection<StatusOpen>, status: Status) => Connection<HeadersOpen>
readonly setBody: (this: Connection<BodyOpen>, body: string | Buffer) => Connection<ResponseEnded>
readonly pipeStream: (this: Connection<BodyOpen>, stream: NodeJS.ReadableStream) => Connection<ResponseEnded>
readonly pipeStream: (
this: Connection<BodyOpen>,
stream: NodeJS.ReadableStream,
onError: (e: unknown) => IO<void>
) => Connection<ResponseEnded>
readonly endResponse: (this: Connection<BodyOpen>) => Connection<ResponseEnded>
}

Expand Down
9 changes: 5 additions & 4 deletions test/Middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { MockConnection, MockRequest } from './_helpers'
import { Readable } from 'stream'
import * as _ from '../src/Middleware'
import * as L from 'fp-ts-contrib/List'
import * as C from 'fp-ts/Console'

function assertSuccess<I, O, A>(m: _.Middleware<I, O, any, A>, cin: MockConnection<I>, a: A, actions: Array<Action>) {
return m(cin)().then((e) => {
Expand Down Expand Up @@ -223,9 +224,9 @@ describe('Middleware', () => {
}
const stream = someStream()
const c = new MockConnection<H.BodyOpen>(new MockRequest())
const m = _.pipeStream(stream)
const m = _.pipeStream(stream, C.error)

return assertSuccess(m, c, undefined, [{ type: 'pipeStream', stream }])
return assertSuccess(m, c, undefined, [{ type: 'pipeStream', stream, onError: C.error }])
})

it('should pipe a stream and handle the failure', () => {
Expand All @@ -238,9 +239,9 @@ describe('Middleware', () => {
}
const stream = someStream()
const c = new MockConnection<H.BodyOpen>(new MockRequest())
const m = _.pipeStream(stream)
const m = _.pipeStream(stream, C.error)

return assertSuccess(m, c, undefined, [{ type: 'pipeStream', stream }])
return assertSuccess(m, c, undefined, [{ type: 'pipeStream', stream, onError: C.error }])
})
})

Expand Down
24 changes: 23 additions & 1 deletion test/express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import express from 'express'
import supertest from 'supertest'
import * as t from 'io-ts'
import * as E from 'fp-ts/Either'
import * as C from 'fp-ts/Console'

describe('express', () => {
it('should call `next` with an error', () => {
Expand Down Expand Up @@ -120,11 +121,32 @@ describe('express', () => {
const m = pipe(
M.status(H.Status.OK),
M.ichain(() => M.closeHeaders()),
M.ichain(() => M.pipeStream(stream))
M.ichain(() => M.pipeStream(stream, C.error))
)
server.use(toRequestHandler(m))

return supertest(server).get('/').expect(200, 'a')
})

it('should handle piped stream error', () => {
const server = express()
const someStream = (): Readable => {
const stream = new Readable()
stream._read = () => {
stream.emit('error', new Error('abort'))
}
return stream
}

const stream = someStream()
const m = pipe(
M.status(H.Status.OK),
M.ichain(() => M.closeHeaders()),
M.ichain(() => M.pipeStream(stream, C.error))
)
server.use(toRequestHandler(m))

return expect(supertest(server).get('/')).rejects.toThrowError('socket hang up')
})
})
})

0 comments on commit e294ef1

Please sign in to comment.