Skip to content

Commit bc969b7

Browse files
authored
fix: provide websocket instead of stream to avoid potential backpressure issues (#289) (#290)
* fix: provide websocket instead of stream to avoid potential backpressure issues (#289) * chore: update documentation and tests * chore: update documentation to include an example of using createWebSocketStream * chore: update failing unit test
1 parent 25d37d2 commit bc969b7

File tree

8 files changed

+299
-362
lines changed

8 files changed

+299
-362
lines changed

README.md

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ After registering this plugin, you can choose on which routes the WS server will
3333
const fastify = require('fastify')()
3434
fastify.register(require('@fastify/websocket'))
3535
fastify.register(async function (fastify) {
36-
fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
37-
connection.socket.on('message', message => {
36+
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
37+
socket.on('message', message => {
3838
// message.toString() === 'hi from client'
39-
connection.socket.send('hi from server')
39+
socket.send('hi from server')
4040
})
4141
})
4242
})
@@ -63,17 +63,17 @@ fastify.register(require('@fastify/websocket'), {
6363
})
6464

6565
fastify.register(async function (fastify) {
66-
fastify.get('/*', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
67-
connection.socket.on('message', message => {
66+
fastify.get('/*', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
67+
socket.on('message', message => {
6868
// message.toString() === 'hi from client'
69-
connection.socket.send('hi from wildcard route')
69+
socket.send('hi from wildcard route')
7070
})
7171
})
7272

73-
fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
74-
connection.socket.on('message', message => {
73+
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
74+
socket.on('message', message => {
7575
// message.toString() === 'hi from client'
76-
connection.socket.send('hi from server')
76+
socket.send('hi from server')
7777
})
7878
})
7979
})
@@ -93,10 +93,10 @@ It is important that websocket route handlers attach event handlers synchronousl
9393
Here is an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing:
9494

9595
```javascript
96-
fastify.get('/*', { websocket: true }, (connection, request) => {
96+
fastify.get('/*', { websocket: true }, (socket, request) => {
9797
const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise
9898

99-
connection.socket.on('message', async (message) => {
99+
socket.on('message', async (message) => {
100100
const session = await sessionPromise()
101101
// do something with the message and session
102102
})
@@ -113,9 +113,9 @@ fastify.addHook('preValidation', async (request, reply) => {
113113
await reply.code(401).send("not authenticated");
114114
}
115115
})
116-
fastify.get('/', { websocket: true }, (connection, req) => {
116+
fastify.get('/', { websocket: true }, (socket, req) => {
117117
// the connection will only be opened for authenticated incoming requests
118-
connection.socket.on('message', message => {
118+
socket.on('message', message => {
119119
// ...
120120
})
121121
})
@@ -134,13 +134,13 @@ import websocket from '@fastify/websocket'
134134
const fastify = Fastify()
135135
await fastify.register(websocket)
136136

137-
fastify.get('/', { websocket: true }, function wsHandler (connection, req) {
137+
fastify.get('/', { websocket: true }, function wsHandler (socket, req) {
138138
// bound to fastify server
139139
this.myDecoration.someFunc()
140140

141-
connection.socket.on('message', message => {
141+
socket.on('message', message => {
142142
// message.toString() === 'hi from client'
143-
connection.socket.send('hi from server')
143+
socket.send('hi from server')
144144
})
145145
})
146146

@@ -154,8 +154,8 @@ If you need to handle both HTTP requests and incoming socket connections on the
154154

155155
const fastify = require('fastify')()
156156

157-
function handle (conn, req) {
158-
conn.pipe(conn) // creates an echo server
157+
function handle (socket, req) {
158+
socket.on('message', (data) => socket.send(data)) // creates an echo server
159159
}
160160

161161
fastify.register(require('@fastify/websocket'), {
@@ -171,13 +171,12 @@ fastify.register(async function () {
171171
// this will handle http requests
172172
reply.send({ hello: 'world' })
173173
},
174-
wsHandler: (conn, req) => {
174+
wsHandler: (socket, req) => {
175175
// this will handle websockets connections
176-
conn.setEncoding('utf8')
177-
conn.write('hello client')
176+
socket.send('hello client')
178177

179-
conn.once('data', chunk => {
180-
conn.end()
178+
socket.once('message', chunk => {
179+
socket.close()
181180
})
182181
}
183182
})
@@ -201,10 +200,10 @@ Neither the `errorHandler` passed to this plugin or fastify's `onError` hook wil
201200
const fastify = require('fastify')()
202201

203202
fastify.register(require('@fastify/websocket'), {
204-
errorHandler: function (error, conn /* SocketStream */, req /* FastifyRequest */, reply /* FastifyReply */) {
203+
errorHandler: function (error, socket /* WebSocket */, req /* FastifyRequest */, reply /* FastifyReply */) {
205204
// Do stuff
206205
// destroy/close connection
207-
conn.destroy(error)
206+
socket.terminate()
208207
},
209208
options: {
210209
maxPayload: 1048576, // we set the maximum allowed messages size to 1 MiB (1024 bytes * 1024 bytes)
@@ -217,10 +216,10 @@ fastify.register(require('@fastify/websocket'), {
217216
}
218217
})
219218

220-
fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
221-
connection.socket.on('message', message => {
219+
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
220+
socket.on('message', message => {
222221
// message.toString() === 'hi from client'
223-
connection.socket.send('hi from server')
222+
socket.send('hi from server')
224223
})
225224
})
226225

@@ -247,8 +246,8 @@ fastify.register(require('@fastify/websocket'), {
247246
preClose: (done) => { // Note: can also use async style, without done-callback
248247
const server = this.websocketServer
249248

250-
for (const connection of server.clients) {
251-
connection.close(1001, 'WS server is going offline in custom manner, sending a code + message')
249+
for (const socket of server.clients) {
250+
socket.close(1001, 'WS server is going offline in custom manner, sending a code + message')
252251
}
253252

254253
server.close(done)
@@ -263,6 +262,32 @@ It allows to test easily a websocket endpoint.
263262

264263
The signature of injectWS is the following: `([path], [upgradeContext])`.
265264

265+
266+
### Creating a stream from the WebSocket
267+
268+
```js
269+
const Fastify = require('fastify')
270+
const FastifyWebSocket = require('@fastify/websocket')
271+
const ws = require('ws')
272+
273+
const fastify = Fastify()
274+
await fastify.register(websocket)
275+
276+
fastify.get('/', { websocket: true }, (socket, req) => {
277+
const stream = ws.createWebSocketStream(socket, { /* options */ })
278+
stream.setEncoding('utf8')
279+
stream.write('hello client')
280+
281+
stream.on('data', function (data) {
282+
// Make sure to set up a data handler or read all the incoming
283+
// data in another way, otherwise stream backpressure will cause
284+
// the underlying WebSocket object to get paused.
285+
})
286+
})
287+
288+
await fastify.listen({ port: 3000 })
289+
```
290+
266291
#### App.js
267292

268293
```js
@@ -282,9 +307,9 @@ App.register(async function(fastify) {
282307
}
283308
})
284309

285-
fastify.get('/', { websocket: true }, (connection) => {
286-
connection.socket.on('message', message => {
287-
connection.socket.send('hi from server')
310+
fastify.get('/', { websocket: true }, (socket) => {
311+
socket.on('message', message => {
312+
socket.send('hi from server')
288313
})
289314
})
290315
})
@@ -350,15 +375,6 @@ _**NB** The `path` option from `ws` should not be provided since the routing is
350375

351376
_**NB** The `noServer` option from `ws` should not be provided since the point of @fastify/websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_
352377

353-
You can also pass the following as `connectionOptions` for [createWebSocketStream](https://github.com/websockets/ws/blob/master/doc/ws.md#createwebsocketstreamwebsocket-options).
354-
355-
- `allowHalfOpen` <boolean> If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true.
356-
- `readable` <boolean> Sets whether the Duplex should be readable. Default: true.
357-
- `writable` <boolean> Sets whether the Duplex should be writable. Default: true.
358-
- `readableObjectMode` <boolean> Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false.
359-
- `readableHighWaterMark` <number> Sets highWaterMark for the readable side of the stream.
360-
- `writableHighWaterMark` <number> Sets highWaterMark for the writable side of the stream.
361-
362378
[ws](https://github.com/websockets/ws) does not allow you to set `objectMode` or `writableObjectMode` to true
363379
## Acknowledgements
364380

index.js

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -122,20 +122,11 @@ function fastifyWebsocket (fastify, opts, next) {
122122
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
123123
wss.emit('connection', socket, rawRequest)
124124

125-
const connection = WebSocket.createWebSocketStream(socket, opts.connectionOptions)
126-
connection.socket = socket
127-
128-
connection.on('error', (error) => {
125+
socket.on('error', (error) => {
129126
fastify.log.error(error)
130127
})
131128

132-
connection.socket.on('newListener', event => {
133-
if (event === 'message') {
134-
connection.resume()
135-
}
136-
})
137-
138-
callback(connection)
129+
callback(socket)
139130
})
140131
}
141132

@@ -187,20 +178,20 @@ function fastifyWebsocket (fastify, opts, next) {
187178
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
188179
if (request.raw[kWs]) {
189180
reply.hijack()
190-
handleUpgrade(request.raw, connection => {
181+
handleUpgrade(request.raw, socket => {
191182
let result
192183
try {
193184
if (isWebsocketRoute) {
194-
result = wsHandler.call(this, connection, request)
185+
result = wsHandler.call(this, socket, request)
195186
} else {
196-
result = noHandle.call(this, connection, request)
187+
result = noHandle.call(this, socket, request)
197188
}
198189
} catch (err) {
199-
return errorHandler.call(this, err, connection, request, reply)
190+
return errorHandler.call(this, err, socket, request, reply)
200191
}
201192

202193
if (result && typeof result.catch === 'function') {
203-
result.catch(err => errorHandler.call(this, err, connection, request, reply))
194+
result.catch(err => errorHandler.call(this, err, socket, request, reply))
204195
}
205196
})
206197
} else {
@@ -229,19 +220,14 @@ function fastifyWebsocket (fastify, opts, next) {
229220
done()
230221
}
231222

232-
function noHandle (connection, rawRequest) {
223+
function noHandle (socket, rawRequest) {
233224
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
234-
connection.socket.close()
225+
socket.close()
235226
}
236227

237-
function defaultErrorHandler (error, conn, request) {
238-
// Before destroying the connection, we attach an error listener.
239-
// Since we already handled the error, adding this listener prevents the ws
240-
// library from emitting the error and causing an uncaughtException
241-
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
242-
conn.on('error', _ => { })
228+
function defaultErrorHandler (error, socket, request) {
243229
request.log.error(error)
244-
conn.destroy(error)
230+
socket.terminate()
245231
}
246232

247233
next()

0 commit comments

Comments
 (0)