Skip to content

Commit

Permalink
refactor: move active requests logic to ActiveRequest type
Browse files Browse the repository at this point in the history
  • Loading branch information
josecelano committed Jun 25, 2024
1 parent 35b6c84 commit 61fb4b2
Showing 1 changed file with 80 additions and 45 deletions.
125 changes: 80 additions & 45 deletions src/servers/udp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,67 @@ impl Drop for ActiveRequests {
}
}

impl ActiveRequests {
/// It inserts the abort handle for the UDP request processor tasks.
///
/// If there is no room for the new task, it tries to make place:
///
/// - Firstly, removing finished tasks.
/// - Secondly, removing the oldest unfinished tasks.
pub async fn force_push(&mut self, abort_handle: AbortHandle, local_addr: &str) {
// fill buffer with requests
let Err(abort_handle) = self.rb.try_push(abort_handle) else {
return;
};

let mut finished: u64 = 0;
let mut unfinished_task = None;

// buffer is full.. lets make some space.
for h in self.rb.pop_iter() {
// remove some finished tasks
if h.is_finished() {
finished += 1;
continue;
}

// task is unfinished.. give it another chance.
tokio::task::yield_now().await;

// if now finished, we continue.
if h.is_finished() {
finished += 1;
continue;
}

tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)");

if finished == 0 {
// we have _no_ finished tasks.. will abort the unfinished task to make space...
h.abort();

tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)");
break;
}

// we have space, return unfinished task for re-entry.
unfinished_task = Some(h);
}

// re-insert the previous unfinished task.
if let Some(h) = unfinished_task {
self.rb.try_push(h).expect("it was previously inserted");
}

// insert the new task.
if !abort_handle.is_finished() {
self.rb
.try_push(abort_handle)
.expect("it should remove at least one element.");
}
}
}

/// Wrapper for Tokio [`UdpSocket`][`tokio::net::UdpSocket`] that is bound to a particular socket.
struct BoundSocket {
socket: Arc<tokio::net::UdpSocket>,
Expand Down Expand Up @@ -421,60 +482,34 @@ impl Udp {
}
};

let abort_handle =
tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle();

if abort_handle.is_finished() {
continue;
}

// fill buffer with requests
let Err(req) = reqs.rb.try_push(abort_handle) else {
continue;
};

let mut finished: u64 = 0;
let mut unfinished_task = None;
// buffer is full.. lets make some space.
for h in reqs.rb.pop_iter() {
// remove some finished tasks
if h.is_finished() {
finished += 1;
continue;
}
/* code-review:
// task is unfinished.. give it another chance.
tokio::task::yield_now().await;
Does it make sense to spawn a new request processor task when
the ActiveRequests buffer is full?
// if now finished, we continue.
if h.is_finished() {
finished += 1;
continue;
}
We could store the UDP request in a secondary buffer and wait
until active tasks are finished. When a active request is finished
we can move a new UDP request from the pending to process requests
buffer to the active requests buffer.
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, removed_count = finished, "Udp::run_udp_server::loop (got unfinished task)");
This forces us to define an explicit timeout for active requests.
if finished == 0 {
// we have _no_ finished tasks.. will abort the unfinished task to make space...
h.abort();
In the current solution the timeout is dynamic, it depends on
the system load. With high load we can remove tasks without
giving them enough time to be processed. With low load we could
keep processing running longer than a reasonable time for
the client to receive the response.
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_udp_server::loop aborting request: (no finished tasks)");
break;
}
*/

// we have space, return unfinished task for re-entry.
unfinished_task = Some(h);
}
let abort_handle =
tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle();

// re-insert the previous unfinished task.
if let Some(h) = unfinished_task {
reqs.rb.try_push(h).expect("it was previously inserted");
if abort_handle.is_finished() {
continue;
}

// insert the new task.
if !req.is_finished() {
reqs.rb.try_push(req).expect("it should remove at least one element.");
}
reqs.force_push(abort_handle, &local_addr).await;
} else {
tokio::task::yield_now().await;
// the request iterator returned `None`.
Expand Down

0 comments on commit 61fb4b2

Please sign in to comment.