Skip to content

Commit

Permalink
docs: [torrust#918] add comments to the UDP server
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Jun 28, 2024
1 parent bb8b2ad commit a897de4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 79 deletions.
38 changes: 15 additions & 23 deletions src/servers/udp/server/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Launcher {
}

async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
let reqs = &mut ActiveRequests::default();
let active_requests = &mut ActiveRequests::default();

let addr = receiver.bound_socket_address();
let local_addr = format!("udp://{addr}");
Expand All @@ -127,37 +127,29 @@ impl Launcher {
}
};

/* code-review:
Does it make sense to spawn a new request processor task when
the ActiveRequests buffer is full?
We could store the UDP request in a secondary buffer and wait
until active tasks are finished. When a active request is finished
we can move a new UDP request from the pending to process requests
buffer to the active requests buffer.
This forces us to define an explicit timeout for active requests.
In the current solution the timeout is dynamic, it depends on
the system load. With high load we can remove tasks without
giving them enough time to be processed. With low load we could
keep processing running longer than a reasonable time for
the client to receive the response.
*/

let abort_handle =
// We spawn the new task even if there active requests buffer is
// full. This could seem counterintuitive because we are accepting
// more request and consuming more memory even if the server is
// already busy. However, we "force_push" the new tasks in the
// buffer. That means, in the worst scenario we will abort a
// running task to make place for the new task.
//
// Once concern could be to reach an starvation point were we
// are only adding and removing tasks without given them the
// chance to finish. However, the buffer is yielding before
// aborting one tasks, giving it the chance to finish.
let abort_handle: tokio::task::AbortHandle =
tokio::task::spawn(Launcher::process_request(req, tracker.clone(), receiver.bound_socket.clone()))
.abort_handle();

if abort_handle.is_finished() {
continue;
}

reqs.force_push(abort_handle, &local_addr).await;
active_requests.force_push(abort_handle, &local_addr).await;
} else {
tokio::task::yield_now().await;

// the request iterator returned `None`.
tracing::error!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server breaking: (ran dry, should not happen in production!)");
break;
Expand Down
153 changes: 97 additions & 56 deletions src/servers/udp/server/request_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ use tokio::task::AbortHandle;

use crate::servers::udp::UDP_TRACKER_LOG_TARGET;

/// Ring-Buffer of Active Requests
/// A ring buffer for managing active UDP request abort handles.
///
/// The `ActiveRequests` struct maintains a fixed-size ring buffer of abort
/// handles for UDP request processor tasks. It ensures that at most 50 requests
/// are handled concurrently, and provides mechanisms to handle buffer overflow
/// by removing finished or oldest unfinished tasks.
#[derive(Default)]
pub struct ActiveRequests {
rb: StaticRb<AbortHandle, 50>, // the number of requests we handle at the same time.
rb: StaticRb<AbortHandle, 50>, // The number of requests handled simultaneously.
}

impl std::fmt::Debug for ActiveRequests {
Expand All @@ -29,67 +34,103 @@ impl Drop for ActiveRequests {
}

impl ActiveRequests {
/// It inserts the abort handle for the UDP request processor tasks.
/// Inserts an abort handle for a UDP request processor task.
///
/// If there is no room for the new task, it tries to make place:
/// If the buffer is full, this method attempts to make space by:
///
/// - Firstly, removing finished tasks.
/// - Secondly, removing the oldest unfinished tasks.
/// 1. Removing finished tasks.
/// 2. Removing the oldest unfinished task if no finished tasks are found.
///
/// # Panics
///
/// Will panics if it can't make space for the new handle.
/// This method will panic if it cannot make space for adding a new handle.
///
/// # Arguments
///
/// * `abort_handle` - The `AbortHandle` for the UDP request processor task.
/// * `local_addr` - A string slice representing the local address for logging.
pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) {
// fill buffer with requests
let Err(abort_handle) = self.rb.try_push(abort_handle) else {
return;
};

let mut finished: u64 = 0;
let mut unfinished_task = None;

// buffer is full.. lets make some space.
for h in self.rb.pop_iter() {
// remove some finished tasks
if h.is_finished() {
finished += 1;
continue;
// Attempt to add the new handle to the buffer.
match self.rb.try_push(abort_handle) {
Ok(()) => {
// Successfully added the task, no further action needed.
}

// task is unfinished.. give it another chance.
tokio::task::yield_now().await;

// if now finished, we continue.
if h.is_finished() {
finished += 1;
continue;
Err(abort_handle) => {
// Buffer is full, attempt to make space.

let mut finished: u64 = 0;
let mut unfinished_task = None;

for removed_abort_handle in self.rb.pop_iter() {
// We found a finished tasks ... increase the counter and
// continue searching for more and ...
if removed_abort_handle.is_finished() {
finished += 1;
continue;
}

// The current removed tasks is not finished.

// Give it a second chance to finish.
tokio::task::yield_now().await;

// Recheck if it finished ... increase the counter and
// continue searching for more and ...
if removed_abort_handle.is_finished() {
finished += 1;
continue;
}

// At this point we found a "definitive" unfinished task.

// Log unfinished task.
tracing::debug!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
removed_count = finished,
"Udp::run_udp_server::loop (got unfinished task)"
);

// If no finished tasks were found, abort the current
// unfinished task.
if finished == 0 {
// We make place aborting this task.
removed_abort_handle.abort();

tracing::warn!(
target: UDP_TRACKER_LOG_TARGET,
local_addr,
"Udp::run_udp_server::loop aborting request: (no finished tasks)"
);

break;
}

// At this point we found at least one finished task, but the
// current one is not finished and it was removed from the
// buffer, so we need to re-insert in in the buffer.

// Save the unfinished task for re-entry.
unfinished_task = Some(removed_abort_handle);
}

// After this point there can't be a race condition because only
// one thread owns the active buffer. There is no way for the
// buffer to be full again. That means the "expects" should
// never happen.

// Reinsert the unfinished task if any.
if let Some(h) = unfinished_task {
self.rb.try_push(h).expect("it was previously inserted");
}

// Insert the new task, ensuring there's space.
if !abort_handle.is_finished() {
self.rb
.try_push(abort_handle)
.expect("it should remove at least one element.");
}
}

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)");

if finished == 0 {
// we have _no_ finished tasks.. will abort the unfinished task to make space...
h.abort();

tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)");

break;
}

// we have space, return unfinished task for re-entry.
unfinished_task = Some(h);
}

// re-insert the previous unfinished task.
if let Some(h) = unfinished_task {
self.rb.try_push(h).expect("it was previously inserted");
}

// insert the new task.
if !abort_handle.is_finished() {
self.rb
.try_push(abort_handle)
.expect("it should remove at least one element.");
}
};
}
}

0 comments on commit a897de4

Please sign in to comment.