Skip to content

Commit

Permalink
feat: [#698] refactor UDP logs
Browse files Browse the repository at this point in the history
Use `tracing` crate format:

```
2024-02-19T17:10:05.243973520+00:00 [UDP TRACKER][INFO] request; server_socket_addr=0.0.0.0:6969 action=CONNECT transaction_id=-888840697 request_id=03b92de0-c9f8-4c40-a808-5d706ce770f4
2024-02-19T17:10:05.244016141+00:00 [UDP TRACKER][INFO] response; server_socket_addr=0.0.0.0:6969 transaction_id=-888840697 request_id=03b92de0-c9f8-4c40-a808-5d706ce770f4
2024-02-19T17:10:05.244042841+00:00 [UDP TRACKER][INFO] request; server_socket_addr=0.0.0.0:6969 action=ANNOUNCE transaction_id=-888840697 request_id=2113eb8c-61f4-476b-b3d5-02892f0a2fdb connection_id=-7190270103145546231 info_hash=9c38422213e30bff212b30c360d26f9a02136422
2024-02-19T17:10:05.244052082+00:00 [UDP TRACKER][INFO] response; server_socket_addr=0.0.0.0:6969 transaction_id=-888840697 request_id=2113eb8c-61f4-476b-b3d5-02892f0a2fdb
```
  • Loading branch information
josecelano committed Feb 19, 2024
1 parent af2e0f9 commit 4a2d902
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 27 deletions.
6 changes: 3 additions & 3 deletions src/bootstrap/jobs/udp_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: S
.expect("it should be able to start the udp tracker");

tokio::spawn(async move {
debug!(target: "UDP Tracker", "Wait for launcher (UDP service) to finish ...");
debug!(target: "UDP Tracker", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());
debug!(target: "UDP TRACKER", "Wait for launcher (UDP service) to finish ...");
debug!(target: "UDP TRACKER", "Is halt channel closed before waiting?: {}", server.state.halt_task.is_closed());

Check warning on line 42 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L41-L42

Added lines #L41 - L42 were not covered by tests

assert!(
!server.state.halt_task.is_closed(),
Expand All @@ -52,6 +52,6 @@ pub async fn start_job(config: &UdpTracker, tracker: Arc<core::Tracker>, form: S
.await
.expect("it should be able to join to the udp tracker task");

debug!(target: "UDP Tracker", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());
debug!(target: "UDP TRACKER", "Is halt channel closed after finishing the server?: {}", server.state.halt_task.is_closed());

Check warning on line 55 in src/bootstrap/jobs/udp_tracker.rs

View check run for this annotation

Codecov / codecov/patch

src/bootstrap/jobs/udp_tracker.rs#L55

Added line #L55 was not covered by tests
})
}
6 changes: 3 additions & 3 deletions src/console/ci/e2e/logs_parser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Utilities to parse Torrust Tracker logs.
use serde::{Deserialize, Serialize};

const UDP_TRACKER_PATTERN: &str = "[UDP Tracker][INFO] Starting on: udp://";
const UDP_TRACKER_PATTERN: &str = "[UDP TRACKER][INFO] Starting on: udp://";
const HTTP_TRACKER_PATTERN: &str = "[HTTP TRACKER][INFO] Starting on: ";
const HEALTH_CHECK_PATTERN: &str = "[HEALTH CHECK API][INFO] Starting on: ";

Expand All @@ -20,7 +20,7 @@ impl RunningServices {
/// ```text
/// Loading default configuration file: `./share/default/config/tracker.development.sqlite3.toml` ...
/// 2024-01-24T16:36:14.614898789+00:00 [torrust_tracker::bootstrap::logging][INFO] logging initialized.
/// 2024-01-24T16:36:14.615586025+00:00 [UDP Tracker][INFO] Starting on: udp://0.0.0.0:6969
/// 2024-01-24T16:36:14.615586025+00:00 [UDP TRACKER][INFO] Starting on: udp://0.0.0.0:6969
/// 2024-01-24T16:36:14.615623705+00:00 [torrust_tracker::bootstrap::jobs][INFO] TLS not enabled
/// 2024-01-24T16:36:14.615694484+00:00 [HTTP TRACKER][INFO] Starting on: http://0.0.0.0:7070
/// 2024-01-24T16:36:14.615710534+00:00 [HTTP TRACKER][INFO] Started on: http://0.0.0.0:7070
Expand Down Expand Up @@ -86,7 +86,7 @@ mod tests {
#[test]
fn it_should_parse_from_logs_with_valid_logs() {
let logs = "\
[UDP Tracker][INFO] Starting on: udp://0.0.0.0:8080\n\
[UDP TRACKER][INFO] Starting on: udp://0.0.0.0:8080\n\
[HTTP TRACKER][INFO] Starting on: 0.0.0.0:9090\n\
[HEALTH CHECK API][INFO] Starting on: 0.0.0.0:10010";
let running_services = RunningServices::parse_from_logs(logs);
Expand Down
63 changes: 48 additions & 15 deletions src/servers/udp/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Handlers for the UDP server.
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::panic::Location;
use std::sync::Arc;
Expand All @@ -7,13 +8,16 @@ use aquatic_udp_protocol::{
AnnounceInterval, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectResponse, ErrorResponse, NumberOfDownloads,
NumberOfPeers, Port, Request, Response, ResponsePeer, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, TransactionId,
};
use log::{debug, info};
use log::debug;
use tokio::net::UdpSocket;
use torrust_tracker_located_error::DynError;
use uuid::Uuid;

use super::connection_cookie::{check, from_connection_id, into_connection_id, make};
use super::UdpRequest;
use crate::core::{statistics, ScrapeData, Tracker};
use crate::servers::udp::error::Error;
use crate::servers::udp::logging::{log_bad_request, log_error_response, log_request, log_response};
use crate::servers::udp::peer_builder;
use crate::servers::udp::request::AnnounceWrapper;
use crate::shared::bit_torrent::common::MAX_SCRAPE_TORRENTS;
Expand All @@ -28,33 +32,50 @@ use crate::shared::bit_torrent::info_hash::InfoHash;
/// type.
///
/// It will return an `Error` response if the request is invalid.
pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc<Tracker>) -> Response {
pub(crate) async fn handle_packet(udp_request: UdpRequest, tracker: &Arc<Tracker>, socket: Arc<UdpSocket>) -> Response {
debug!("Handling Packets: {udp_request:?}");

let request_id = RequestId::make(&udp_request);
let server_socket_addr = socket.local_addr().expect("Could not get local_addr for socket.");

match Request::from_bytes(&udp_request.payload[..udp_request.payload.len()], MAX_SCRAPE_TORRENTS).map_err(|e| {
Error::InternalServer {
message: format!("{e:?}"),
location: Location::caller(),
}
}) {
Ok(request) => {
log_request(&request, &request_id, &server_socket_addr);

let transaction_id = match &request {
Request::Connect(connect_request) => connect_request.transaction_id,
Request::Announce(announce_request) => announce_request.transaction_id,
Request::Scrape(scrape_request) => scrape_request.transaction_id,
};

match handle_request(request, udp_request.from, tracker).await {
let response = match handle_request(request, udp_request.from, tracker).await {
Ok(response) => response,
Err(e) => handle_error(&e, transaction_id),
}
};

log_response(&response, &transaction_id, &request_id, &server_socket_addr);

response
}
Err(e) => {
log_bad_request(&request_id);

let response = handle_error(
&Error::BadRequest {
source: (Arc::new(e) as DynError).into(),
},
TransactionId(0),
);

log_error_response(&request_id);

response
}
// bad request
Err(e) => handle_error(
&Error::BadRequest {
source: (Arc::new(e) as DynError).into(),
},
TransactionId(0),
),
}
}

Expand All @@ -80,7 +101,6 @@ pub async fn handle_request(request: Request, remote_addr: SocketAddr, tracker:
///
/// This function does not ever return an error.
pub async fn handle_connect(remote_addr: SocketAddr, request: &ConnectRequest, tracker: &Tracker) -> Result<Response, Error> {
info!(target: "UDP", "\"CONNECT TxID {}\"", request.transaction_id.0);
debug!("udp connect request: {:#?}", request);

let connection_cookie = make(&remote_addr);
Expand Down Expand Up @@ -138,8 +158,6 @@ pub async fn handle_announce(
source: (Arc::new(e) as Arc<dyn std::error::Error + Send + Sync>).into(),
})?;

info!(target: "UDP", "\"ANNOUNCE TxID {} IH {}\"", announce_request.transaction_id.0, info_hash.to_hex_string());

let mut peer = peer_builder::from_request(&wrapped_announce_request, &remote_client_ip);

let response = tracker.announce(&info_hash, &mut peer, &remote_client_ip).await;
Expand Down Expand Up @@ -214,7 +232,6 @@ pub async fn handle_announce(
///
/// This function does not ever return an error.
pub async fn handle_scrape(remote_addr: SocketAddr, request: &ScrapeRequest, tracker: &Tracker) -> Result<Response, Error> {
info!(target: "UDP", "\"SCRAPE TxID {}\"", request.transaction_id.0);
debug!("udp scrape request: {:#?}", request);

// Convert from aquatic infohashes
Expand Down Expand Up @@ -274,6 +291,22 @@ fn handle_error(e: &Error, transaction_id: TransactionId) -> Response {
})
}

/// An identifier for a request.
#[derive(Debug, Clone)]
pub struct RequestId(Uuid);

Check warning on line 296 in src/servers/udp/handlers.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/handlers.rs#L295-L296

Added lines #L295 - L296 were not covered by tests

impl RequestId {
fn make(_request: &UdpRequest) -> RequestId {
RequestId(Uuid::new_v4())
}
}

impl fmt::Display for RequestId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}

Check warning on line 307 in src/servers/udp/handlers.rs

View check run for this annotation

Codecov / codecov/patch

src/servers/udp/handlers.rs#L305-L307

Added lines #L305 - L307 were not covered by tests
}

#[cfg(test)]
mod tests {

Expand Down
73 changes: 73 additions & 0 deletions src/servers/udp/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! Logging for UDP Tracker requests and responses.

use std::net::SocketAddr;

use aquatic_udp_protocol::{Request, Response, TransactionId};

use super::handlers::RequestId;
use crate::shared::bit_torrent::info_hash::InfoHash;

pub fn log_request(request: &Request, request_id: &RequestId, server_socket_addr: &SocketAddr) {
let action = map_action_name(request);

match &request {
Request::Connect(connect_request) => {
let transaction_id = connect_request.transaction_id;
let transaction_id_str = transaction_id.0.to_string();

tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, action = %action, transaction_id = %transaction_id_str, request_id = %request_id);
}
Request::Announce(announce_request) => {
let transaction_id = announce_request.transaction_id;
let transaction_id_str = transaction_id.0.to_string();
let connection_id_str = announce_request.connection_id.0.to_string();
let info_hash_str = InfoHash::from_bytes(&announce_request.info_hash.0).to_hex_string();

tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, action = %action, transaction_id = %transaction_id_str, request_id = %request_id, connection_id = %connection_id_str, info_hash = %info_hash_str);
}
Request::Scrape(scrape_request) => {
let transaction_id = scrape_request.transaction_id;
let transaction_id_str = transaction_id.0.to_string();
let connection_id_str = scrape_request.connection_id.0.to_string();

tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "request", server_socket_addr = %server_socket_addr, action = %action, transaction_id = %transaction_id_str, request_id = %request_id, connection_id = %connection_id_str);
}
};
}

fn map_action_name(udp_request: &Request) -> String {
match udp_request {
Request::Connect(_connect_request) => "CONNECT".to_owned(),
Request::Announce(_announce_request) => "ANNOUNCE".to_owned(),
Request::Scrape(_scrape_request) => "SCRAPE".to_owned(),
}
}

pub fn log_response(
_response: &Response,
transaction_id: &TransactionId,
request_id: &RequestId,
server_socket_addr: &SocketAddr,
) {
tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "response", server_socket_addr = %server_socket_addr, transaction_id = %transaction_id.0.to_string(), request_id = %request_id);
}

pub fn log_bad_request(request_id: &RequestId) {
tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "bad request", request_id = %request_id);
}

pub fn log_error_response(request_id: &RequestId) {
tracing::span!(
target: "UDP TRACKER",
tracing::Level::INFO, "response", request_id = %request_id);
}
1 change: 1 addition & 0 deletions src/servers/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ use std::net::SocketAddr;
pub mod connection_cookie;
pub mod error;
pub mod handlers;
pub mod logging;
pub mod peer_builder;
pub mod request;
pub mod server;
Expand Down
12 changes: 6 additions & 6 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ impl Udp {
let address = socket.local_addr().expect("Could not get local_addr from {binding}.");
let halt = shutdown_signal_with_message(rx_halt, format!("Halting Http Service Bound to Socket: {address}"));

info!(target: "UDP Tracker", "Starting on: udp://{}", address);
info!(target: "UDP TRACKER", "Starting on: udp://{}", address);

let running = tokio::task::spawn(async move {
debug!(target: "UDP Tracker", "Started: Waiting for packets on socket address: udp://{address} ...");
debug!(target: "UDP TRACKER", "Started: Waiting for packets on socket address: udp://{address} ...");

let tracker = tracker.clone();
let socket = socket.clone();
Expand All @@ -275,13 +275,13 @@ impl Udp {
.send(Started { address })
.expect("the UDP Tracker service should not be dropped");

debug!(target: "UDP Tracker", "Started on: udp://{}", address);
debug!(target: "UDP TRACKER", "Started on: udp://{}", address);

let stop = running.abort_handle();

select! {
_ = running => { debug!(target: "UDP Tracker", "Socket listener stopped on address: udp://{address}"); },
() = halt => { debug!(target: "UDP Tracker", "Halt signal spawned task stopped on address: udp://{address}"); }
_ = running => { debug!(target: "UDP TRACKER", "Socket listener stopped on address: udp://{address}"); },
() = halt => { debug!(target: "UDP TRACKER", "Halt signal spawned task stopped on address: udp://{address}"); }
}
stop.abort();

Expand Down Expand Up @@ -327,7 +327,7 @@ impl Udp {
async fn make_response(tracker: Arc<Tracker>, socket: Arc<UdpSocket>, udp_request: UdpRequest) {
trace!("Making Response to {udp_request:?}");
let from = udp_request.from;
let response = handlers::handle_packet(udp_request, &tracker.clone()).await;
let response = handlers::handle_packet(udp_request, &tracker.clone(), socket.clone()).await;
Self::send_response(&socket.clone(), from, response).await;
}

Expand Down

0 comments on commit 4a2d902

Please sign in to comment.