Skip to content

Commit

Permalink
fix: [#591] panicking after starting UDP server due to close halt cha…
Browse files Browse the repository at this point in the history
…nnel
  • Loading branch information
josecelano committed Jan 11, 2024
1 parent 49c961c commit 0c1f389
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 33 deletions.
5 changes: 3 additions & 2 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"Containerfile",
"curr",
"Cyberneering",
"datagram",
"datetime",
"Dijke",
"distroless",
Expand Down Expand Up @@ -79,6 +80,7 @@
"nonroot",
"Norberg",
"numwant",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7",
"oneshot",
"ostr",
"Pando",
Expand Down Expand Up @@ -129,8 +131,7 @@
"Xtorrent",
"Xunlei",
"xxxxxxxxxxxxxxxxxxxxd",
"yyyyyyyyyyyyyyyyyyyyd",
"nvCFlJCq7fz7Qx6KoKTDiMZvns8l5Kw7"
"yyyyyyyyyyyyyyyyyyyyd"
],
"enableFiletypes": [
"dockerfile",
Expand Down
11 changes: 11 additions & 0 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! for the configuration options.
use std::sync::Arc;

use log::debug;
use tokio::task::JoinHandle;
use torrust_tracker_configuration::UdpTracker;

Expand Down Expand Up @@ -36,10 +37,20 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>) -> Join
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");
debug!(target: "UDP Tracker", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());

Check warning on line 41 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L40-L41

Added lines #L40 - L41 were not covered by tests

assert!(
!server.state.halt_task.is_closed(),

Check warning on line 44 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L43-L44

Added lines #L43 - L44 were not covered by tests
"Halt channel for UDP tracker should be open"
);

server
.state
.task
.await
.expect("it should be able to join to the udp tracker task");

debug!(target: "UDP Tracker", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());

Check warning on line 54 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L54

Added line #L54 was not covered by tests
})
}
4 changes: 3 additions & 1 deletion src/servers/apis/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ impl Launcher {
let tls = self.tls.clone();
let protocol = if tls.is_some() { "https" } else { "http" };

info!(target: "API", "Starting on {protocol}://{}", address);

let running = Box::pin(async {
match tls {
Some(tls) => axum_server::from_tcp_rustls(socket, tls)
Expand All @@ -190,7 +192,7 @@ impl Launcher {
}
});

info!(target: "API", "API server started on {protocol}://{}", address);
info!(target: "API", "Started on {protocol}://{}", address);

tx_start
.send(Started { address })
Expand Down
2 changes: 1 addition & 1 deletion src/servers/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Launcher {
tokio::task::spawn(graceful_shutdown(
handle.clone(),
rx_halt,
format!("Shutting down http server on socket address: {address}"),
format!("Shutting down HTTP server on socket address: {address}"),
));

let tls = self.tls.clone();
Expand Down
86 changes: 57 additions & 29 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,30 @@ impl UdpServer<Stopped> {
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<Started>();
let (tx_halt, rx_halt) = tokio::sync::oneshot::channel::<Halted>();

assert!(!tx_halt.is_closed(), "Halt channel for UDP tracker should be open");

let launcher = self.state.launcher;

let task = tokio::spawn(async move {
launcher.start(tracker, tx_start, rx_halt).await;
debug!(target: "UDP Tracker", "Launcher starting ...");

let starting = launcher.start(tracker, tx_start, rx_halt).await;

starting.await.expect("UDP server should have started running");

launcher
});

let binding = rx_start.await.expect("unable to start service").address;

let running_udp_server: UdpServer<Running> = UdpServer {
state: Running {
binding: rx_start.await.expect("unable to start service").address,
binding,
halt_task: tx_halt,
task,
},
};

info!("Running UDP Tracker on Socket: {}", running_udp_server.state.binding);

Ok(running_udp_server)
}
}
Expand Down Expand Up @@ -202,41 +209,62 @@ impl Udp {
tx_start: Sender<Started>,
rx_halt: Receiver<Halted>,
) -> JoinHandle<()> {
let binding = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = binding.local_addr().expect("Could not get local_addr from {binding}.");
let socket = Arc::new(UdpSocket::bind(bind_to).await.expect("Could not bind to {self.socket}."));
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");

info!(target: "UDP Tracker", "Starting on: udp://{}", address);

let running = tokio::task::spawn(async move {
let halt = async move {
shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}")).await;
let halt = tokio::task::spawn(async move {
debug!(target: "UDP Tracker", "Waiting for halt signal for socket address: udp://{address} ...");

shutdown_signal_with_message(
rx_halt,
format!("Shutting down UDP server on socket address: udp://{address}"),
)
.await;
});

let listen = async move {
debug!(target: "UDP Tracker", "Waiting for packets on socket address: udp://{address} ...");

loop {
let mut data = [0; MAX_PACKET_SIZE];
let socket_clone = socket.clone();

match socket_clone.recv_from(&mut data).await {
Ok((valid_bytes, remote_addr)) => {
let payload = data[..valid_bytes].to_vec();

Check warning on line 237 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L236-L237

Added lines #L236 - L237 were not covered by tests

debug!(target: "UDP Tracker", "Received {} bytes", payload.len());
debug!(target: "UDP Tracker", "From: {}", &remote_addr);
debug!(target: "UDP Tracker", "Payload: {:?}", payload);

Check warning on line 241 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L239-L241

Added lines #L239 - L241 were not covered by tests

let response = handle_packet(remote_addr, payload, &tracker).await;

Check warning on line 243 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L243

Added line #L243 was not covered by tests

Udp::send_response(socket_clone, remote_addr, response).await;
}
Err(err) => {
error!("Error reading UDP datagram from socket. Error: {:?}", err);
}

Check warning on line 249 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L245-L249

Added lines #L245 - L249 were not covered by tests
}
}

Check warning on line 251 in src/servers/udp/server.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/server.rs#L251

Added line #L251 was not covered by tests
};

pin_mut!(halt);
pin_mut!(listen);

loop {
let mut data = [0; MAX_PACKET_SIZE];
let binding = binding.clone();

tokio::select! {
() = & mut halt => {},

Ok((valid_bytes, remote_addr)) = binding.recv_from(&mut data) => {
let payload = data[..valid_bytes].to_vec();

debug!("Received {} bytes", payload.len());
debug!("From: {}", &remote_addr);
debug!("Payload: {:?}", payload);
tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");

let response = handle_packet(remote_addr, payload, &tracker).await;

Udp::send_response(binding, remote_addr, response).await;
}
}
tokio::select! {
_ = & mut halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); },
() = & mut listen => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
}
});

tx_start
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");
info!(target: "UDP Tracker", "Started on: udp://{}", address);

running
}
Expand Down

0 comments on commit 0c1f389

Please sign in to comment.