Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client connection leak with more than one body chunk #1479

Closed
unixpickle opened this issue Apr 4, 2018 · 10 comments
Closed

Client connection leak with more than one body chunk #1479

unixpickle opened this issue Apr 4, 2018 · 10 comments

Comments

@unixpickle
Copy link

unixpickle commented Apr 4, 2018

In a web application where the server provides more than ~8062 bytes of data to the client, the client's connection leaks. This can lead to all sorts of resource exhaustion errors. I have found that this correlates perfectly with Response::body() producing more than one Chunk.

The client/server programs below reproduce this issue. The server usually dies with the following error:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Error { repr: Os { code: 24, message: "Too many open files" } })', src/libcore/result.rs:916:5

This is likely related to commit ef40081 and issue #1397

Update: this is using hyper version 0.11.24. Added Cargo.toml file below.

client.rs:

extern crate futures;
extern crate hyper;
extern crate tokio_core;

use futures::{Future, IntoFuture, Stream};
use hyper::{Chunk, Client, Error};
use tokio_core::reactor::Core;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = Client::configure().keep_alive(true).build(&handle);
    loop {
        let res = client.get("http://127.0.0.1:8080/".parse().unwrap())
            .and_then(|resp| resp.body().fold((0usize, Vec::<u8>::new()), join_chunks));
        let (num_chunks, data) = core.run(res).unwrap();
        println!("got {} bytes ({} chunks)", data.len(), num_chunks);
    }
}

fn join_chunks(
    mut state: (usize, Vec<u8>),
    next: Chunk
) -> Box<Future<Item = (usize, Vec<u8>), Error = Error>> {
    state.1.extend(next.to_vec());
    Box::new(Ok((state.0 + 1, state.1)).into_future())
}

server.rs

extern crate futures;
extern crate hyper;

use std::iter::repeat;

use futures::{Future, IntoFuture};
use hyper::{Error, Request, Response, StatusCode};
use hyper::header::ContentType;
use hyper::server::{Http, Service};

// If you set this to something like 100, it works!
// When I did a binary search, I found 8062 worked and 8063 didn't.
const PAYLOAD_SIZE: usize = 10000;

fn main() {
    Http::new()
        .bind(&"127.0.0.1:8080".parse().unwrap(), || Ok(MyService{}))
        .unwrap()
        .run()
        .unwrap();
}

struct MyService{}

impl Service for MyService {
    type Request = Request;
    type Response = Response;
    type Error = Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, _: Request) -> Self::Future {
        Box::new(Ok(Response::new()
            .with_status(StatusCode::Ok)
            .with_header(ContentType("application/octet-stream".parse().unwrap()))
            .with_body(repeat(0u8).take(PAYLOAD_SIZE).collect::<Vec<u8>>())).into_future())
    }
}

Cargo.toml

[package]
name = "reproduce"
version = "0.1.0"
authors = ["Alex Nichol <[email protected]>"]

[[bin]]
name = "reproduce-server"
path = "src/server.rs"

[[bin]]
name = "reproduce-client"
path = "src/client.rs"

[dependencies]
futures = "0.1"
hyper = "0.11"
tokio-core = "0.1"
unixpickle added a commit to unixpickle/squidtun that referenced this issue Apr 4, 2018
Without keepalives, we get too many sockets in TIME_WAIT. With keepalives, we are bumping into hyperium/hyper#1479
@lnicola
Copy link
Contributor

lnicola commented Apr 4, 2018

Can you try with 7fe9710? I ran into what looks like the same issue and that commit fixed it for me.

@unixpickle
Copy link
Author

Tested with 7fe9710 and it worked!

@lnicola
Copy link
Contributor

lnicola commented Apr 4, 2018

@unixpickle FYI, hyper 0.11.25 is now out.

@unixpickle
Copy link
Author

Very speedy fix!

@unixpickle
Copy link
Author

I'm still not entirely sure what the problem was, though.

7fe9710 supposedly works because it delays EOF until the connection has been added back to the pool. This seems to imply that I could work around the problem on 0.11.24 by sleeping after reading the body. However, this is not the case. Here's an updated client that still does not work on 0.11.24:

extern crate futures;
extern crate hyper;
extern crate simple_logger;
extern crate tokio_core;

use std::thread::sleep;
use std::time::Duration;

use futures::{Future, IntoFuture, Stream};
use hyper::{Chunk, Client, Error};
use tokio_core::reactor::Core;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = Client::configure().keep_alive(true).build(&handle);
    loop {
        let res = client.get("http://127.0.0.1:8080/".parse().unwrap())
            .and_then(|resp| resp.body().fold((0usize, Vec::<u8>::new()), join_chunks));
        let (num_chunks, data) = core.run(res).unwrap();
        println!("got {} bytes ({} chunks)", data.len(), num_chunks);
        sleep(Duration::from_millis(100));
    }
}

fn join_chunks(
    mut state: (usize, Vec<u8>),
    next: Chunk
) -> Box<Future<Item = (usize, Vec<u8>), Error = Error>> {
    state.1.extend(next.to_vec());
    Box::new(Ok((state.0 + 1, state.1)).into_future())
}

There must be something else I'm missing. For example, I don't see why this drop is necessary. Maybe @seanmonstar can shine some light on this.

@seanmonstar
Copy link
Member

Sleeping the thread won't fix it: the connection task (future) that was spawned needs to be polled again, and that can't happen if the Core isn't running.

@unixpickle
Copy link
Author

Ah, that's very helpful. However, I'm still not sure I see the full picture.

Here's a modified version where the core runs forever, and the sleep is done using a future. The problem still happens on 0.11.24. In this case, the core is running forever, so @seanmonstar's previous explanation doesn't apply.

extern crate futures;
extern crate hyper;
extern crate simple_logger;
extern crate tokio_core;

use std::time::Duration;

use futures::{Future, IntoFuture, Stream};
use futures::stream::repeat;
use hyper::{Chunk, Client, Error};
use tokio_core::reactor::{Core, Timeout};

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = Client::configure().keep_alive(true).build(&handle);
    core.run(repeat::<(), Error>(()).for_each(move |_| {
        let local_handle = handle.clone();
        client.get("http://127.0.0.1:8080/".parse().unwrap())
            .and_then(|resp| resp.body().fold((0usize, Vec::<u8>::new()), join_chunks))
            .and_then(move |(num_chunks, data)| {
                println!("got {} bytes ({} chunks)", data.len(), num_chunks);
                Timeout::new(Duration::from_millis(100), &local_handle).unwrap()
                    .map_err(From::from).map(|_| ())
            })
    })).unwrap();
}

fn join_chunks(
    mut state: (usize, Vec<u8>),
    next: Chunk
) -> Box<Future<Item = (usize, Vec<u8>), Error = Error>> {
    state.1.extend(next.to_vec());
    Box::new(Ok((state.0 + 1, state.1)).into_future())
}

By the way, I have been testing for the problem by running netstat -an | grep 127 | wc -l.

@seanmonstar
Copy link
Member

I don't know exactly where the issue is, but it's fixed in 0.11.25, so I personally wouldn't expend much effort myself to look further XD

@unixpickle
Copy link
Author

I did some investigating, since I didn't think this should be left a mystery. I found that commit 7fe9710 fixed another bug in addition to the delayed EOF bug. The bug was in this block of code from 0.11.24 (see in src/client/mod.rs):

if let Ok(Async::NotReady) = pooled.tx.poll_ready() {
    // If the executor doesn't have room, oh well. Things will likely
    // be blowing up soon, but this specific task isn't required.
    let _ = executor.execute(future::poll_fn(move || {
        pooled.tx.poll_ready().map_err(|_| ())
    }));
}

The problem here is that pooled.tx.poll_ready() is called once from one task, and then repeatedly from a new task. In general, I'm guessing futures do not support being rescheduled on a different task. In particular, the Giver does not handle being moved to a new task; it continues to wake up the original task (which can end before the connection is put back into the pool).

And, it hardly needs to be said: the bug doesn't happen for 1 chunk because the first time the executed poll_fn is called, the connection is ready to be given back to the pool.

The EOF bug is one issue that needed to be fixed. My original reproduction fell victim to both bugs, but the later one with Timeouts only fell victim to the task rescheduling bug.

If anybody is interested, here is a minimal patch on top of 0.11.24 that fixes the task rescheduling bug but not the EOF bug:

diff --git a/src/client/conn.rs b/src/client/conn.rs
index f6a139b5..ed5d7147 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -126,6 +126,10 @@ impl<B> SendRequest<B>
         self.dispatch.poll_ready()
     }
 
+    pub(super) fn is_wanting(&self) -> bool {
+        self.dispatch.is_wanting()
+    }
+
     pub(super) fn is_closed(&self) -> bool {
         self.dispatch.is_closed()
     }
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index efc720d8..d7943d9b 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -45,6 +45,10 @@ impl<T, U> Sender<T, U> {
         }
     }
 
+    pub fn is_wanting(&self) -> bool {
+        self.giver.is_wanting()
+    }
+
     pub fn is_closed(&self) -> bool {
         self.giver.is_canceled()
     }
diff --git a/src/client/mod.rs b/src/client/mod.rs
index eb8c10ea..e81ef5dc 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -253,7 +253,7 @@ where C: Connect,
                     // for a new request to start.
                     //
                     // It won't be ready if there is a body to stream.
-                    if let Ok(Async::NotReady) = pooled.tx.poll_ready() {
+                    if !pooled.tx.is_wanting() {
                         // If the executor doesn't have room, oh well. Things will likely
                         // be blowing up soon, but this specific task isn't required.
                         let _ = executor.execute(future::poll_fn(move || {
@@ -661,4 +661,3 @@ mod background {
         }
     }
 }
-
diff --git a/src/client/signal.rs b/src/client/signal.rs
index 2ddf67f7..7fd6bb44 100644
--- a/src/client/signal.rs
+++ b/src/client/signal.rs
@@ -88,6 +88,10 @@ impl Giver {
         }
     }
 
+    pub fn is_wanting(&self) -> bool {
+        self.inner.state.load(Ordering::SeqCst) == STATE_WANT
+    }
+
     pub fn is_canceled(&self) -> bool {
         self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED
     }

@seanmonstar
Copy link
Member

Ah yes, I had noticed this as well a little while ago, and forgotten that the fix included that. Generally, implementations of Future should behave correctly if moved to different tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants