Skip to content

Commit

Permalink
Add #injectWS (#276)
Browse files Browse the repository at this point in the history
* Add #injectWS 


Now is possible to invoke an websocket handler without listening

* Up-to-date documentation

Added testing paragraph

* Removed unused dependency

* remove ws.terminate() in the plugin

The user must manually close the ws in the test

* add upgradeContext parameter in injectWS

It allows to enhance the request made for upgrading the socket

* Rejects if websocket upgrade failed

* remove useless line in docs

* rejects with 'Unexpected server response: <statusCode>'

Implementation as close as possibile to ws connectiong error

* Fix test

* Fix types

---------

Signed-off-by: Daniele Fedeli <[email protected]>
Co-authored-by: Daniele Fedeli <[email protected]>
  • Loading branch information
DanieleFedeli and Daniele Fedeli authored Feb 19, 2024
1 parent a5ef710 commit cb3ce0d
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 6 deletions.
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,78 @@ fastify.register(require('@fastify/websocket'), {
})
```

### Testing

Testing the ws handler can be quite tricky, luckily `fastify-websocket` decorates fastify instance with `injectWS`.
It allows to test easily a websocket endpoint.

The signature of injectWS is the following: `([path], [upgradeContext])`.

#### App.js

```js
'use strict'

const Fastify = require('fastify')
const FastifyWebSocket = require('@fastify/websocket')

const App = Fastify()

App.register(FastifyWebSocket);

App.register(async function(fastify) {
fastify.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send()
}
})

fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
})
})
})

module.exports = App
```

#### App.test.js

```js
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const App = require('./app.js')

test('connect to /', async (t) => {
t.plan(1)

const fastify = Fastify()
fastify.register(App)
t.teardown(fastify.close.bind(fastify))

const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }})
let resolve;
const promise = new Promise(r => { resolve = r })

ws.on('message', (data) => {
resolve(data.toString());
})
ws.send('hi from client')

t.assert(await promise, 'hi from server')
// Remember to close the ws at the end
ws.terminate()
})
```

#### Things to know
- Websocket need to be closed manually at the end of each test.
- `fastify.ready()` needs to be awaited to ensure that fastify has been decorated.
- You need to register the event listener before sending the message if you need to process server response.

## Options

`@fastify/websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :
Expand Down
60 changes: 59 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const { ServerResponse } = require('node:http')
const { PassThrough } = require('node:stream')
const { randomBytes } = require('node:crypto')
const fp = require('fastify-plugin')
const WebSocket = require('ws')
const Duplexify = require('duplexify')

const kWs = Symbol('ws-socket')
const kWsHead = Symbol('ws-head')
Expand Down Expand Up @@ -47,6 +50,60 @@ function fastifyWebsocket (fastify, opts, next) {
const wss = new WebSocket.Server(wssOptions)
fastify.decorate('websocketServer', wss)

async function injectWS (path = '/', upgradeContext = {}) {
const server2Client = new PassThrough()
const client2Server = new PassThrough()

const serverStream = new Duplexify(server2Client, client2Server)
const clientStream = new Duplexify(client2Server, server2Client)

const ws = new WebSocket(null, undefined, { isServer: false })
const head = Buffer.from([])

let resolve, reject
const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject })

ws.on('open', () => {
clientStream.removeListener('data', onData)
resolve(ws)
})

const onData = (chunk) => {
if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) {
ws._isServer = false
ws.setSocket(clientStream, head, { maxPayload: 0 })
} else {
clientStream.removeListener('data', onData)
const statusCode = Number(chunk.toString().match(/HTTP\/1.1 (\d+)/)[1])
reject(new Error('Unexpected server response: ' + statusCode))
}
}

clientStream.on('data', onData)

const req = {
...upgradeContext,
method: 'GET',
headers: {
...upgradeContext.headers,
connection: 'upgrade',
upgrade: 'websocket',
'sec-websocket-version': 13,
'sec-websocket-key': randomBytes(16).toString('base64')
},
httpVersion: '1.1',
url: path,
[kWs]: serverStream,
[kWsHead]: head
}

websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead])

return promise
}

fastify.decorate('injectWS', injectWS)

function onUpgrade (rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand Down Expand Up @@ -164,6 +221,7 @@ function fastifyWebsocket (fastify, opts, next) {
client.close()
}
}

fastify.server.removeListener('upgrade', onUpgrade)

server.close(done)
Expand All @@ -181,7 +239,7 @@ function fastifyWebsocket (fastify, opts, next) {
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => {})
conn.on('error', _ => { })
request.log.error(error)
conn.destroy(error)
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"tsd": "^0.30.1"
},
"dependencies": {
"duplexify": "^4.1.2",
"fastify-plugin": "^4.0.0",
"ws": "^8.0.0"
},
Expand Down
134 changes: 134 additions & 0 deletions test/inject.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const fastifyWebsocket = require('..')

function buildFastify (t) {
const fastify = Fastify()
t.teardown(() => { fastify.close() })
fastify.register(fastifyWebsocket)
return fastify
}

test('routes correctly the message', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws')
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('redirect on / if no path specified', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS()
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('routes correctly the message between two routes', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
let _reject
const promise = new Promise((resolve, reject) => { _resolve = resolve; _reject = reject })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', () => {
_reject('wrong-route')
})
})

instance.get('/ws-2', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws-2')
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('use the upgrade context to upgrade if there is some hook', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send()
}
})

instance.get('/', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/', { headers: { 'api-key': 'some-random-key' } })
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('rejects if the websocket is not upgraded', async (t) => {
const fastify = buildFastify(t)

fastify.register(
async function (instance) {
instance.addHook('preValidation', async (request, reply) => {
return reply.code(401).send()
})

instance.get('/', { websocket: true }, function (conn) {
})
})

await fastify.ready()
t.rejects(fastify.injectWS('/'), 'Unexpected server response: 401')
})
9 changes: 4 additions & 5 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ declare module 'fastify' {
websocket?: boolean;
}

type InjectWSFn<RawRequest> =
((path?: string, upgradeContext?: Partial<RawRequest>) => Promise<WebSocket>)

interface FastifyInstance<RawServer, RawRequest, RawReply, Logger, TypeProvider> {
get: RouteShorthandMethod<RawServer, RawRequest, RawReply, TypeProvider, Logger>,
websocketServer: WebSocket.Server,
injectWS: InjectWSFn<RawRequest>
}

interface FastifyRequest {
Expand Down Expand Up @@ -67,7 +70,6 @@ type FastifyWebsocket = FastifyPluginCallback<fastifyWebsocket.WebsocketPluginOp
declare namespace fastifyWebsocket {

interface WebSocketServerOptions extends Omit<WebSocket.ServerOptions, "path"> { }

export type WebsocketHandler<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand All @@ -81,18 +83,15 @@ declare namespace fastifyWebsocket {
connection: SocketStream,
request: FastifyRequest<RequestGeneric, RawServer, RawRequest, SchemaCompiler, TypeProvider, ContextConfig, Logger>
) => void | Promise<any>;

export interface SocketStream extends Duplex {
socket: WebSocket;
}

export interface WebsocketPluginOptions {
errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void;
options?: WebSocketServerOptions;
connectionOptions?: DuplexOptions;
preClose?: preCloseHookHandler | preCloseAsyncHookHandler;
}

export interface RouteOptions<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand Down

0 comments on commit cb3ce0d

Please sign in to comment.