Skip to content

Commit

Permalink
fix(client): cleanup dropped pending Checkouts from Pool
Browse files Browse the repository at this point in the history
Closes #1315
  • Loading branch information
seanmonstar committed Sep 18, 2017
1 parent 971864c commit 3b91fc6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ language-tags = "0.2"
log = "0.3"
mime = "0.3.2"
percent-encoding = "1.0"
relay = "0.1"
time = "0.1"
tokio-core = "0.1.6"
tokio-proto = "0.1"
Expand Down
80 changes: 75 additions & 5 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::rc::Rc;
use std::time::{Duration, Instant};

use futures::{Future, Async, Poll};
use futures::unsync::oneshot;
use relay;

use http::{KeepAlive, KA};

Expand All @@ -17,8 +17,19 @@ pub struct Pool<T> {

struct PoolInner<T> {
enabled: bool,
// These are internal Conns sitting in the event loop in the KeepAlive
// state, waiting to receive a new Request to send on the socket.
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
parked: HashMap<Rc<String>, VecDeque<oneshot::Sender<Entry<T>>>>,
// These are outstanding Checkouts that are waiting for a socket to be
// able to send a Request one. This is used when "racing" for a new
// connection.
//
// The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
// for the Pool to receive an idle Conn. When a Conn becomes idle,
// this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new
// connection.
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
timeout: Option<Duration>,
}

Expand Down Expand Up @@ -50,13 +61,20 @@ impl<T: Clone> Pool<T> {
let mut entry = Some(entry);
if let Some(parked) = inner.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if tx.is_canceled() {
trace!("Pool::put removing canceled parked {:?}", key);
} else {
tx.complete(entry.take().unwrap());
}
/*
match tx.send(entry.take().unwrap()) {
Ok(()) => break,
Err(e) => {
trace!("Pool::put removing canceled parked {:?}", key);
entry = Some(e);
}
}
*/
}
remove_parked = parked.is_empty();
}
Expand All @@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
}
}


pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
trace!("Pool::pooled {:?}", key);
Pooled {
Expand Down Expand Up @@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
}
}

fn park(&mut self, key: Rc<String>, tx: oneshot::Sender<Entry<T>>) {
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
trace!("Pool::park {:?}", key);
self.inner.borrow_mut()
.parked.entry(key)
Expand All @@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
}
}

impl<T> Pool<T> {
fn clean_parked(&mut self, key: &Rc<String>) {
trace!("Pool::clean_parked {:?}", key);
let mut inner = self.inner.borrow_mut();

let mut remove_parked = false;
if let Some(parked) = inner.parked.get_mut(key) {
parked.retain(|tx| {
!tx.is_canceled()
});
remove_parked = parked.is_empty();
}
if remove_parked {
inner.parked.remove(key);
}
}
}

impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
Expand Down Expand Up @@ -204,7 +241,7 @@ enum TimedKA {
pub struct Checkout<T> {
key: Rc<String>,
pool: Pool<T>,
parked: Option<oneshot::Receiver<Entry<T>>>,
parked: Option<relay::Receiver<Entry<T>>>,
}

impl<T: Clone> Future for Checkout<T> {
Expand Down Expand Up @@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))),
None => {
if self.parked.is_none() {
let (tx, mut rx) = oneshot::channel();
let (tx, mut rx) = relay::channel();
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
Expand All @@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
}
}

impl<T> Drop for Checkout<T> {
fn drop(&mut self) {
self.parked.take();
self.pool.clean_parked(&self.key);
}
}

struct Expiration(Option<Duration>);

impl Expiration {
Expand Down Expand Up @@ -364,4 +408,30 @@ mod tests {
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), *pooled1);
}

#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Rc::new("localhost:12345".to_string());
let _pooled1 = pool.pooled(key.clone(), 41);
let mut checkout1 = pool.checkout(&key);
let mut checkout2 = pool.checkout(&key);

// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2);

// on drop, clean up Pool
drop(checkout1);
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);

drop(checkout2);
assert!(pool.inner.borrow().parked.get(&key).is_none());

::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern crate language_tags;
#[macro_use] extern crate log;
pub extern crate mime;
#[macro_use] extern crate percent_encoding;
extern crate relay;
extern crate time;
extern crate tokio_core as tokio;
#[macro_use] extern crate tokio_io;
Expand Down

0 comments on commit 3b91fc6

Please sign in to comment.