-
-
Notifications
You must be signed in to change notification settings - Fork 201
/
main.rs
152 lines (130 loc) · 4.65 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (c) 2018-2020 Sean McArthur
// Licensed under the MIT license http://opensource.org/licenses/MIT
//
// port from https://github.com/seanmonstar/warp/blob/master/examples/websocket_chat.rs
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::LazyLock;
use futures_util::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use salvo::prelude::*;
use salvo::websocket::{Message, WebSocket, WebSocketUpgrade};
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, salvo::Error>>>>;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let router = Router::new()
.goal(index)
.push(Router::with_path("chat").goal(user_connected));
let acceptor = TcpListener::new("0.0.0.0:5800").bind().await;
Server::new(acceptor).serve(router).await;
}
#[handler]
async fn user_connected(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, handle_socket)
.await
}
async fn handle_socket(ws: WebSocket) {
// Use a counter to assign a new unique ID for this user.
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
tracing::info!("new chat user: {}", my_id);
// Split the socket into a sender and receive of messages.
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
let fut = rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
tracing::error!(error = ?e, "websocket send error");
}
});
tokio::task::spawn(fut);
let fut = async move {
ONLINE_USERS.write().await.insert(my_id, tx);
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("websocket error(uid={my_id}): {e}");
break;
}
};
user_message(my_id, msg).await;
}
user_disconnected(my_id).await;
};
tokio::task::spawn(fut);
}
async fn user_message(my_id: usize, msg: Message) {
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{my_id}>: {msg}");
// New message from this user, send it to everyone else (except same uid)...
for (&uid, tx) in ONLINE_USERS.read().await.iter() {
if my_id != uid {
if let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone()))) {
// The tx is disconnected, our `user_disconnected` code
// should be happening in another task, nothing more to
// do here.
}
}
}
}
async fn user_disconnected(my_id: usize) {
eprintln!("good bye user: {my_id}");
// Stream closed up, so remove from the user list
ONLINE_USERS.write().await.remove(&my_id);
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(INDEX_HTML));
}
static INDEX_HTML: &str = r#"<!DOCTYPE html>
<html>
<head>
<title>WS Chat</title>
</head>
<body>
<h1>WS Chat</h1>
<div id="chat">
<p><em>Connecting...</em></p>
</div>
<input type="text" id="text" />
<button type="button" id="submit">Submit</button>
<script>
const chat = document.getElementById('chat');
const msg = document.getElementById('msg');
const submit = document.getElementById('submit');
const ws = new WebSocket(`ws://${location.host}/chat`);
ws.onopen = function() {
chat.innerHTML = '<p><em>Connected!</em></p>';
};
ws.onmessage = function(msg) {
showMessage(msg.data);
};
ws.onclose = function() {
chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
};
submit.onclick = function() {
const msg = text.value;
ws.send(msg);
text.value = '';
showMessage('<You>: ' + msg);
};
function showMessage(data) {
const line = document.createElement('p');
line.innerText = data;
chat.appendChild(line);
}
</script>
</body>
</html>
"#;