Skip to content

Commit

Permalink
examples: update chat example (#3587)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Liu authored Mar 9, 2021
1 parent db1d904 commit 872bc09
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 70 deletions.
1 change: 0 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ tokio = { version = "1.0.0", features = ["full", "tracing"] }
tokio-util = { version = "0.6.3", features = ["full"] }
tokio-stream = { version = "0.1" }

async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
bytes = "1.0.0"
Expand Down
100 changes: 31 additions & 69 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@

use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::{Stream, StreamExt};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};

use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -101,6 +99,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<String>;

/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<String>;

/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
Expand All @@ -124,7 +125,7 @@ struct Peer {
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Pin<Box<dyn Stream<Item = String> + Send>>,
rx: Rx,
}

impl Shared {
Expand Down Expand Up @@ -156,58 +157,15 @@ impl Peer {
let addr = lines.get_ref().peer_addr()?;

// Create a channel for this peer
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);

let rx = Box::pin(async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
});

Ok(Peer { lines, rx })
}
}

#[derive(Debug)]
enum Message {
/// A message that should be broadcasted to others.
Broadcast(String),

/// A message that should be received by a client
Received(String),
}

// Peer implements `Stream` in a way that polls both the `Rx`, and `Framed` types.
// A message is produced whenever an event is ready until the `Framed` stream returns `None`.
impl Stream for Peer {
type Item = Result<Message, LinesCodecError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First poll the `UnboundedReceiver`.

if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Received(v))));
}

// Secondly poll the `Framed` stream.
let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));

Poll::Ready(match result {
// We've received a message we should broadcast to others.
Some(Ok(message)) => Some(Ok(Message::Broadcast(message))),

// An error occurred.
Some(Err(e)) => Some(Err(e)),

// The stream has been exhausted.
None => None,
})
}
}

/// Process an individual chat client
async fn process(
state: Arc<Mutex<Shared>>,
Expand Down Expand Up @@ -241,28 +199,32 @@ async fn process(
}

// Process incoming messages until our stream is exhausted by a disconnect.
while let Some(result) = peer.next().await {
match result {
// A message was received from the current user, we should
// broadcast this message to the other users.
Ok(Message::Broadcast(msg)) => {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await;
}
// A message was received from a peer. Send it to the
// current user.
Ok(Message::Received(msg)) => {
loop {
tokio::select! {
// A message was received from a peer. Send it to the current user.
Some(msg) = peer.rx.recv() => {
peer.lines.send(&msg).await?;
}
Err(e) => {
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username,
e
);
}
result = peer.lines.next() => match result {
// A message was received from the current user, we should
// broadcast this message to the other users.
Some(Ok(msg)) => {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await;
}
// An error occurred.
Some(Err(e)) => {
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username,
e
);
}
// The stream has been exhausted.
None => break,
},
}
}

Expand Down

0 comments on commit 872bc09

Please sign in to comment.