Skip to content

Commit

Permalink
fix(ext/websocket): Avoid write deadlock that requires read_frame to …
Browse files Browse the repository at this point in the history
…complete (#18705)

Fixes #18700

Timeline of the events that lead to the bug.

1. WebSocket handshake complete
2. Server on `read_frame` holding an AsyncRefCell borrow of the
WebSocket stream.
3. Client sends a TXT frame after a some time
4. Server recieves the frame and goes back to `read_frame`.
5. After some time, Server starts a `write_frame` but `read_frame` is
still holding a borrow!
^--- Locked. read_frame needs to complete so we can resume the write.

This commit changes all writes to directly borrow the
`fastwebsocket::WebSocket` resource under the assumption that it won't
affect ongoing reads.
  • Loading branch information
littledivy authored Apr 14, 2023
1 parent d01340d commit a411144
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 20 deletions.
46 changes: 46 additions & 0 deletions cli/tests/unit/websocket_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,49 @@ Deno.test(async function websocketPingPong() {
await promise;
ws.close();
});

// https://github.com/denoland/deno/issues/18700
Deno.test(
{ sanitizeOps: false, sanitizeResources: false },
async function websocketWriteLock() {
const ac = new AbortController();
const listeningPromise = deferred();

const server = Deno.serve({
handler: (req) => {
const { socket, response } = Deno.upgradeWebSocket(req);
socket.onopen = function () {
setTimeout(() => socket.send("Hello"), 500);
};
socket.onmessage = function (e) {
assertEquals(e.data, "Hello");
ac.abort();
};
return response;
},
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
port: 4246,
});

await listeningPromise;
const promise = deferred();
const ws = new WebSocket("ws://localhost:4246/");
assertEquals(ws.url, "ws://localhost:4246/");
ws.onerror = () => fail();
ws.onmessage = (e) => {
assertEquals(e.data, "Hello");
setTimeout(() => {
ws.send(e.data);
}, 1000);
promise.resolve();
};
ws.onclose = () => {
promise.resolve();
};

await Promise.all([promise, server]);
ws.close();
},
);
43 changes: 23 additions & 20 deletions ext/websocket/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ pub struct ServerWebSocket {
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
}

impl ServerWebSocket {
#[inline]
pub async fn write_frame(
self: Rc<Self>,
frame: Frame,
) -> Result<(), AnyError> {
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
// to populate the write buffer. We encounter an await point when writing
// to the socket after the frame has already been written to the buffer.
let ws = unsafe { &mut *self.ws.as_ptr() };
ws.write_frame(frame)
.await
.map_err(|err| type_error(err.to_string()))?;
Ok(())
}
}

impl Resource for ServerWebSocket {
fn name(&self) -> Cow<str> {
"serverWebSocket".into()
Expand Down Expand Up @@ -61,12 +78,9 @@ pub async fn op_server_ws_send_binary(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;

let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
resource
.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
.await
.map_err(|err| type_error(err.to_string()))?;
Ok(())
}

#[op]
Expand All @@ -79,11 +93,9 @@ pub async fn op_server_ws_send_text(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
resource
.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
.await
.map_err(|err| type_error(err.to_string()))?;
Ok(())
}

#[op]
Expand All @@ -107,12 +119,7 @@ pub async fn op_server_ws_send(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;

ws.write_frame(msg)
.await
.map_err(|err| type_error(err.to_string()))?;
Ok(())
resource.write_frame(msg).await
}

#[op(deferred)]
Expand All @@ -126,14 +133,10 @@ pub async fn op_server_ws_close(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
let frame = reason
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
.unwrap_or_else(|| Frame::close_raw(vec![]));
ws.write_frame(frame)
.await
.map_err(|err| type_error(err.to_string()))?;
Ok(())
resource.write_frame(frame).await
}

#[op(deferred)]
Expand Down

0 comments on commit a411144

Please sign in to comment.