Skip to content

Commit

Permalink
discover: Timeout stalled resolutions (#401)
Browse files Browse the repository at this point in the history
If a load balancer is not polled, i.e. because it is idle/leaked, its
resolution stream can fill up, causing backpressure onto the destination
controller client.

This change implements a timeout for this scenario. When the resolution
buffer is at capacity for a full idle timeout, the background resolution
completes to free its resources.
  • Loading branch information
olix0r authored Dec 18, 2019
1 parent de09017 commit 588609b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 9 deletions.
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,12 @@ impl<A: OrigDstAddr> Config<A> {

// Resolves the target via the control plane and balances requests
// over all endpoints returned from the destination service.
const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 2;
const DISCOVER_UPDATE_BUFFER_CAPACITY: usize = 10;
let balancer_layer = svc::layers()
.push_spawn_ready()
.push(discover::Layer::new(
DISCOVER_UPDATE_BUFFER_CAPACITY,
router_max_idle_age,
map_endpoint::Resolve::new(endpoint::FromMetadata, resolve.clone()),
))
.push(http::balance::layer(EWMA_DEFAULT_RTT, EWMA_DECAY));
Expand Down
48 changes: 42 additions & 6 deletions linkerd/proxy/discover/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use futures::{try_ready, Async, Future, Poll, Stream};
use linkerd2_error::{Error, Never};
use std::fmt;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
use tokio::timer::Delay;
use tower::discover;
use tracing_futures::Instrument;

#[derive(Clone, Debug)]
pub struct Buffer<M> {
capacity: usize,
watchdog_timeout: Duration,
inner: M,
}

Expand All @@ -20,24 +23,31 @@ pub struct Discover<K, S> {
pub struct DiscoverFuture<F, D> {
future: F,
capacity: usize,
watchdog_timeout: Duration,
_marker: std::marker::PhantomData<fn() -> D>,
}

pub struct Daemon<D: discover::Discover> {
discover: D,
disconnect_rx: oneshot::Receiver<Never>,
tx: mpsc::Sender<discover::Change<D::Key, D::Service>>,
watchdog: Option<Delay>,
watchdog_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct Lost(());

impl<M> Buffer<M> {
pub fn new<T>(capacity: usize, inner: M) -> Self
pub fn new<T>(capacity: usize, watchdog_timeout: Duration, inner: M) -> Self
where
Self: tower::Service<T>,
{
Self { capacity, inner }
Self {
capacity,
watchdog_timeout,
inner,
}
}
}

Expand All @@ -63,6 +73,7 @@ where
Self::Future {
future,
capacity: self.capacity,
watchdog_timeout: self.watchdog_timeout,
_marker: std::marker::PhantomData,
}
}
Expand All @@ -88,6 +99,8 @@ where
discover,
disconnect_rx,
tx,
watchdog_timeout: self.watchdog_timeout,
watchdog: None,
};
tokio::spawn(fut.in_current_span());

Expand All @@ -111,10 +124,33 @@ where
Ok(Async::Ready(n)) => match n {},
}

try_ready!(self
.tx
.poll_ready()
.map_err(|_| tracing::trace!("lost sender")));
// The watchdog bounds the amount of time that the send buffer stays
// full. This is designed to release the `discover` resources, i.e.
// if we expect that the receiver has leaked.
match self.tx.poll_ready() {
Ok(Async::Ready(())) => {
self.watchdog = None;
}
Err(_) => {
tracing::trace!("lost sender");
return Err(());
}
Ok(Async::NotReady) => {
let mut watchdog = self
.watchdog
.take()
.unwrap_or_else(|| Delay::new(Instant::now() + self.watchdog_timeout));
if watchdog.poll().expect("timer must not fail").is_ready() {
tracing::warn!(
timeout = ?self.watchdog_timeout,
"dropping resolution due to watchdog",
);
return Err(());
}
self.watchdog = Some(watchdog);
return Ok(Async::NotReady);
}
}

let up = try_ready!(self.discover.poll().map_err(|e| {
let e: Error = e.into();
Expand Down
7 changes: 5 additions & 2 deletions linkerd/proxy/discover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use linkerd2_error::Error;
use linkerd2_proxy_core::Resolve;
use std::fmt;
use std::time::Duration;

pub mod buffer;
pub mod from_resolve;
Expand All @@ -15,20 +16,22 @@ use self::make_endpoint::MakeEndpoint;
#[derive(Clone, Debug)]
pub struct Layer<T, R> {
capacity: usize,
watchdog: Duration,
resolve: R,
_marker: std::marker::PhantomData<fn(T)>,
}

// === impl Layer ===

impl<T, R> Layer<T, R> {
pub fn new(capacity: usize, resolve: R) -> Self
pub fn new(capacity: usize, watchdog: Duration, resolve: R) -> Self
where
R: Resolve<T> + Clone,
R::Endpoint: fmt::Debug + Clone + PartialEq,
{
Self {
capacity,
watchdog,
resolve,
_marker: std::marker::PhantomData,
}
Expand All @@ -53,6 +56,6 @@ where
fn layer(&self, make_endpoint: M) -> Self::Service {
let make_discover =
MakeEndpoint::new(make_endpoint, FromResolve::new(self.resolve.clone()));
Buffer::new(self.capacity, make_discover)
Buffer::new(self.capacity, self.watchdog, make_discover)
}
}

0 comments on commit 588609b

Please sign in to comment.