From 019cf9ffdc327193f4c2e065e46a238ac76bdf1f Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Mon, 8 Jul 2024 16:21:37 +0200 Subject: [PATCH] udp: processor for requests --- src/servers/udp/handlers.rs | 6 +-- src/servers/udp/server/bound_socket.rs | 7 +-- src/servers/udp/server/launcher.rs | 65 ++----------------------- src/servers/udp/server/mod.rs | 1 + src/servers/udp/server/processor.rs | 66 ++++++++++++++++++++++++++ src/servers/udp/server/receiver.rs | 8 ++-- src/shared/bit_torrent/common.rs | 12 ----- 7 files changed, 81 insertions(+), 84 deletions(-) create mode 100644 src/servers/udp/server/processor.rs diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index f1f61ee6b..c6b2458e5 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -33,7 +33,7 @@ use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS; /// - Delegating the request to the correct handler depending on the request type. /// /// It will return an `Error` response if the request is invalid. -pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc, addr: SocketAddr) -> Response { +pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Tracker, local_addr: SocketAddr) -> Response { debug!("Handling Packets: {udp_request:?}"); let start_time = Instant::now(); @@ -47,7 +47,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc { - log_request(&request, &request_id, &addr); + log_request(&request, &request_id, &local_addr); let transaction_id = match &request { Request::Connect(connect_request) => connect_request.transaction_id, @@ -62,7 +62,7 @@ pub(crate) async fn handle_packet(udp_request: RawRequest, tracker: &Arc, + socket: tokio::net::UdpSocket, } impl BoundSocket { @@ -30,9 +29,7 @@ impl BoundSocket { let local_addr = format!("udp://{addr}"); tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "UdpSocket::new (bound)"); - Ok(Self { - socket: Arc::new(socket), - }) + Ok(Self { socket }) } /// # Panics diff --git a/src/servers/udp/server/launcher.rs b/src/servers/udp/server/launcher.rs index bb7c7d70f..7b40f6604 100644 --- a/src/servers/udp/server/launcher.rs +++ b/src/servers/udp/server/launcher.rs @@ -1,26 +1,23 @@ -use std::io::Cursor; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use aquatic_udp_protocol::Response; use derive_more::Constructor; use futures_util::StreamExt; use tokio::select; use tokio::sync::oneshot; use super::request_buffer::ActiveRequests; -use super::RawRequest; use crate::bootstrap::jobs::Started; use crate::core::Tracker; use crate::servers::logging::STARTED_ON; use crate::servers::registar::ServiceHealthCheckJob; use crate::servers::signals::{shutdown_signal_with_message, Halted}; use crate::servers::udp::server::bound_socket::BoundSocket; +use crate::servers::udp::server::processor::Processor; use crate::servers::udp::server::receiver::Receiver; -use crate::servers::udp::{handlers, UDP_TRACKER_LOG_TARGET}; +use crate::servers::udp::UDP_TRACKER_LOG_TARGET; use crate::shared::bit_torrent::tracker::udp::client::check; -use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; /// A UDP server instance launcher. #[derive(Constructor)] @@ -109,6 +106,8 @@ impl Launcher { let local_addr = format!("udp://{addr}"); loop { + let processor = Processor::new(receiver.socket.clone(), tracker.clone()); + if let Some(req) = { tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server (wait for request)"); receiver.next().await @@ -138,9 +137,7 @@ impl Launcher { // are only adding and removing tasks without given them the // chance to finish. However, the buffer is yielding before // aborting one tasks, giving it the chance to finish. - let abort_handle: tokio::task::AbortHandle = - tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone())) - .abort_handle(); + let abort_handle: tokio::task::AbortHandle = tokio::task::spawn(processor.process_request(req)).abort_handle(); if abort_handle.is_finished() { continue; @@ -156,56 +153,4 @@ impl Launcher { } } } - - async fn process_request(request: RawRequest, tracker: Arc, socket: Arc) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)"); - Self::process_valid_request(tracker, socket, request).await; - } - - async fn process_valid_request(tracker: Arc, socket: Arc, udp_request: RawRequest) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, "Udp::process_valid_request. Making Response to {udp_request:?}"); - let from = udp_request.from; - let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.address()).await; - Self::send_response(&socket.clone(), from, response).await; - } - - async fn send_response(bound_socket: &Arc, to: SocketAddr, response: Response) { - let response_type = match &response { - Response::Connect(_) => "Connect".to_string(), - Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(), - Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(), - Response::Scrape(_) => "Scrape".to_string(), - Response::Error(e) => format!("Error: {e:?}"), - }; - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)"); - - let buffer = vec![0u8; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buffer); - - match response.write_bytes(&mut cursor) { - Ok(()) => { - #[allow(clippy::cast_possible_truncation)] - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sending...)" ); - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), payload = ?&inner[..position], "Udp::send_response (sending...)"); - - Self::send_packet(bound_socket, &to, &inner[..position]).await; - - tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count = &inner[..position].len(), "Udp::send_response (sent)"); - } - Err(e) => { - tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)"); - } - } - } - - async fn send_packet(bound_socket: &Arc, remote_addr: &SocketAddr, payload: &[u8]) { - tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)"); - - // doesn't matter if it reaches or not - drop(bound_socket.send_to(payload, remote_addr).await); - } } diff --git a/src/servers/udp/server/mod.rs b/src/servers/udp/server/mod.rs index e3321f157..16133e21b 100644 --- a/src/servers/udp/server/mod.rs +++ b/src/servers/udp/server/mod.rs @@ -5,6 +5,7 @@ use super::RawRequest; pub mod bound_socket; pub mod launcher; +pub mod processor; pub mod receiver; pub mod request_buffer; pub mod spawner; diff --git a/src/servers/udp/server/processor.rs b/src/servers/udp/server/processor.rs new file mode 100644 index 000000000..e633a2358 --- /dev/null +++ b/src/servers/udp/server/processor.rs @@ -0,0 +1,66 @@ +use std::io::Cursor; +use std::net::SocketAddr; +use std::sync::Arc; + +use aquatic_udp_protocol::Response; + +use super::bound_socket::BoundSocket; +use crate::core::Tracker; +use crate::servers::udp::{handlers, RawRequest, UDP_TRACKER_LOG_TARGET}; + +pub struct Processor { + socket: Arc, + tracker: Arc, +} + +impl Processor { + pub fn new(socket: Arc, tracker: Arc) -> Self { + Self { socket, tracker } + } + + pub async fn process_request(self, request: RawRequest) { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, request = %request.from, "Udp::process_request (receiving)"); + + let from = request.from; + let response = handlers::handle_packet(request, &self.tracker, self.socket.address()).await; + self.send_response(from, response).await; + } + + async fn send_response(self, to: SocketAddr, response: Response) { + let response_type = match &response { + Response::Connect(_) => "Connect".to_string(), + Response::AnnounceIpv4(_) => "AnnounceIpv4".to_string(), + Response::AnnounceIpv6(_) => "AnnounceIpv6".to_string(), + Response::Scrape(_) => "Scrape".to_string(), + Response::Error(e) => format!("Error: {e:?}"), + }; + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, target = ?to, response_type, "Udp::send_response (sending)"); + + let mut writer = Cursor::new(Vec::with_capacity(200)); + + match response.write_bytes(&mut writer) { + Ok(()) => { + let bytes_count = writer.get_ref().len(); + let payload = writer.get_ref(); + + tracing::debug!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sending...)" ); + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, ?to, bytes_count, ?payload, "Udp::send_response (sending...)"); + + self.send_packet(&to, payload).await; + + tracing::trace!(target:UDP_TRACKER_LOG_TARGET, ?to, bytes_count, "Udp::send_response (sent)"); + } + Err(e) => { + tracing::error!(target: UDP_TRACKER_LOG_TARGET, ?to, response_type, err = %e, "Udp::send_response (error)"); + } + } + } + + async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) { + tracing::trace!(target: UDP_TRACKER_LOG_TARGET, to = %remote_addr, ?payload, "Udp::send_response (sending)"); + + // doesn't matter if it reaches or not + drop(self.socket.send_to(payload, remote_addr).await); + } +} diff --git a/src/servers/udp/server/receiver.rs b/src/servers/udp/server/receiver.rs index 020ab7324..0176930a4 100644 --- a/src/servers/udp/server/receiver.rs +++ b/src/servers/udp/server/receiver.rs @@ -11,7 +11,7 @@ use super::RawRequest; use crate::shared::bit_torrent::tracker::udp::MAX_PACKET_SIZE; pub struct Receiver { - pub bound_socket: Arc, + pub socket: Arc, data: RefCell<[u8; MAX_PACKET_SIZE]>, } @@ -19,13 +19,13 @@ impl Receiver { #[must_use] pub fn new(bound_socket: Arc) -> Self { Receiver { - bound_socket, + socket: bound_socket, data: RefCell::new([0; MAX_PACKET_SIZE]), } } pub fn bound_socket_address(&self) -> SocketAddr { - self.bound_socket.address() + self.socket.address() } } @@ -36,7 +36,7 @@ impl Stream for Receiver { let mut buf = *self.data.borrow_mut(); let mut buf = tokio::io::ReadBuf::new(&mut buf); - let Poll::Ready(ready) = self.bound_socket.poll_recv_from(cx, &mut buf) else { + let Poll::Ready(ready) = self.socket.poll_recv_from(cx, &mut buf) else { return Poll::Pending; }; diff --git a/src/shared/bit_torrent/common.rs b/src/shared/bit_torrent/common.rs index 9625b88e7..3dd059a6a 100644 --- a/src/shared/bit_torrent/common.rs +++ b/src/shared/bit_torrent/common.rs @@ -1,7 +1,6 @@ //! `BitTorrent` protocol primitive types //! //! [BEP 3. The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html) -use serde::{Deserialize, Serialize}; /// The maximum number of torrents that can be returned in an `scrape` response. /// @@ -21,14 +20,3 @@ pub const MAX_SCRAPE_TORRENTS: u8 = 74; /// See function to [`generate`](crate::core::auth::generate) the /// [`ExpiringKeys`](crate::core::auth::ExpiringKey) for more information. pub const AUTH_KEY_LENGTH: usize = 32; - -#[repr(u32)] -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -enum Actions { - // todo: it seems this enum is not used anywhere. Values match the ones in - // aquatic_udp_protocol::request::Request::from_bytes. - Connect = 0, - Announce = 1, - Scrape = 2, - Error = 3, -}