Skip to content

Commit

Permalink
feat(server): Handle 100-continue
Browse files Browse the repository at this point in the history
  • Loading branch information
sfackler committed Jun 25, 2017
1 parent 1e31e11 commit a9b6752
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 19 deletions.
67 changes: 49 additions & 18 deletions src/http/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ where I: AsyncRead + AsyncWrite,
}
}

fn can_write_continue(&self) -> bool {
match self.state.writing {
Writing::Continue(..) => true,
_ => false,
}
}

fn can_read_body(&self) -> bool {
match self.state.reading {
Reading::Body(..) => true,
Expand Down Expand Up @@ -106,6 +113,10 @@ where I: AsyncRead + AsyncWrite,
}
};
self.state.busy();
if head.expecting_continue() {
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
self.state.writing = Writing::Continue(Cursor::new(msg));
}
let wants_keep_alive = head.should_keep_alive();
self.state.keep_alive &= wants_keep_alive;
let (body, reading) = if decoder.is_eof() {
Expand Down Expand Up @@ -173,6 +184,7 @@ where I: AsyncRead + AsyncWrite,
}

match self.state.writing {
Writing::Continue(..) |
Writing::Body(..) |
Writing::Ending(..) => return,
Writing::Init |
Expand All @@ -192,6 +204,7 @@ where I: AsyncRead + AsyncWrite,

fn can_write_head(&self) -> bool {
match self.state.writing {
Writing::Continue(ref buf) if !buf.has_started() => true,
Writing::Init => true,
_ => false
}
Expand All @@ -200,6 +213,7 @@ where I: AsyncRead + AsyncWrite,
fn can_write_body(&self) -> bool {
match self.state.writing {
Writing::Body(..) => true,
Writing::Continue(..) |
Writing::Init |
Writing::Ending(..) |
Writing::KeepAlive |
Expand Down Expand Up @@ -296,6 +310,15 @@ where I: AsyncRead + AsyncWrite,
fn write_queued(&mut self) -> Poll<(), io::Error> {
trace!("Conn::write_queued()");
let state = match self.state.writing {
Writing::Continue(ref mut queued) => {
let n = self.io.buffer(queued.buf());
queued.consume(n);
if queued.is_written() {
Writing::Init
} else {
return Ok(Async::NotReady);
}
}
Writing::Body(ref mut encoder, ref mut queued) => {
let complete = if let Some(chunk) = queued.as_mut() {
let n = try_nb!(encoder.encode(&mut self.io, chunk.buf()));
Expand Down Expand Up @@ -355,24 +378,28 @@ where I: AsyncRead + AsyncWrite,
trace!("Conn::poll()");
self.state.read_task.take();

if self.is_read_closed() {
trace!("Conn::poll when closed");
Ok(Async::Ready(None))
} else if self.can_read_head() {
self.read_head()
} else if self.can_read_body() {
self.read_body()
.map(|async| async.map(|chunk| Some(Frame::Body {
chunk: chunk
})))
.or_else(|err| {
self.state.close_read();
Ok(Async::Ready(Some(Frame::Error { error: err.into() })))
})
} else {
trace!("poll when on keep-alive");
self.maybe_park_read();
Ok(Async::NotReady)
loop {
if self.is_read_closed() {
trace!("Conn::poll when closed");
return Ok(Async::Ready(None));
} else if self.can_read_head() {
return self.read_head();
} else if self.can_write_continue() {
try_nb!(self.flush());
} else if self.can_read_body() {
return self.read_body()
.map(|async| async.map(|chunk| Some(Frame::Body {
chunk: chunk
})))
.or_else(|err| {
self.state.close_read();
Ok(Async::Ready(Some(Frame::Error { error: err.into() })))
});
} else {
trace!("poll when on keep-alive");
self.maybe_park_read();
return Ok(Async::NotReady);
}
}
}
}
Expand Down Expand Up @@ -482,6 +509,7 @@ enum Reading {
}

enum Writing<B> {
Continue(Cursor<&'static [u8]>),
Init,
Body(Encoder, Option<Cursor<B>>),
Ending(Cursor<&'static [u8]>),
Expand All @@ -503,6 +531,9 @@ impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> {
impl<B: AsRef<[u8]>> fmt::Debug for Writing<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Writing::Continue(ref buf) => f.debug_tuple("Continue")
.field(buf)
.finish(),
Writing::Init => f.write_str("Init"),
Writing::Body(ref enc, ref queued) => f.debug_tuple("Body")
.field(enc)
Expand Down
4 changes: 4 additions & 0 deletions src/http/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl<T: AsRef<[u8]>> Cursor<T> {
}
}

pub fn has_started(&self) -> bool {
self.pos != 0
}

pub fn is_written(&self) -> bool {
trace!("Cursor::is_written pos = {}, len = {}", self.pos, self.bytes.as_ref().len());
self.pos >= self.bytes.as_ref().len()
Expand Down
17 changes: 16 additions & 1 deletion src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;

use bytes::BytesMut;

use header::{Connection, ConnectionOption};
use header::{Connection, ConnectionOption, Expect};
use header::Headers;
use method::Method;
use status::StatusCode;
Expand Down Expand Up @@ -56,6 +56,10 @@ impl<S> MessageHead<S> {
pub fn should_keep_alive(&self) -> bool {
should_keep_alive(self.version, &self.headers)
}

pub fn expecting_continue(&self) -> bool {
expecting_continue(self.version, &self.headers)
}
}

impl ResponseHead {
Expand Down Expand Up @@ -119,6 +123,17 @@ pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool {
ret
}

/// Checks if a connection is expecting a `100 Continue` before sending its body.
#[inline]
pub fn expecting_continue(version: HttpVersion, headers: &Headers) -> bool {
let ret = match (version, headers.get::<Expect>()) {
(Http11, Some(&Expect::Continue)) => true,
_ => false
};
trace!("expecting_continue(version={:?}, header={:?} = {:?}", version, headers.get::<Expect>(), ret);
ret
}

#[derive(Debug)]
pub enum ServerTransaction {}

Expand Down

0 comments on commit a9b6752

Please sign in to comment.